--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/multithreading.php Tue Sep 23 23:24:13 2008 -0400
@@ -0,0 +1,336 @@
+<?php
+
+/**
+ * Multi-threading (well sort of) tools
+ *
+ * Greyhound - real web management for Amarok
+ * Copyright (C) 2008 Dan Fuhry
+ *
+ * This program is Free Software; you can redistribute and/or modify it under the terms of the GNU General Public License
+ * as published by the Free Software Foundation; either version 2 of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied
+ * warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for details.
+ */
+
+require_once('json.php');
+
+/**
+ * Global signal handler for SIGCHLD.
+ */
+
+function Threader_SigChld()
+{
+ global $threader_instances;
+ foreach ( $threader_instances as &$mt )
+ {
+ if ( is_object($mt) )
+ {
+ $mt->event_sigchld();
+ }
+ }
+}
+
+/**
+ * Global signal handler for SIGUSR2.
+ */
+
+function Threader_SigUsr2()
+{
+ global $threader_instances;
+ foreach ( $threader_instances as &$mt )
+ {
+ if ( is_object($mt) )
+ {
+ $parchild = $mt->is_child() ? 'child' : 'parent';
+ $mt->event_sigusr2();
+ }
+ }
+}
+
+/**
+ * List of Threader instances. Needed for global handling of signals.
+ * @var array
+ */
+
+global $threader_instances;
+$threader_instances = array();
+
+/**
+ * Tools for emulating multi-threaded operation in PHP scripts.
+ * @package Amarok
+ * @subpackage WebControl
+ * @author Dan Fuhry
+ * @license GNU General Public License <http://www.gnu.org/licenses/old-licenses/gpl-2.0.html>
+ */
+
+class Threader
+{
+
+ /**
+ * Return value of fork() if the process is a child.
+ * @const int
+ */
+
+ const FORK_CHILD = -1;
+
+ /**
+ * Set to true if this is a child process. No exceptions.
+ * @var bool
+ * @access private
+ */
+
+ private $is_child = false;
+
+ /**
+ * Sockets for inter-process communication.
+ * @var array
+ * @access private
+ */
+
+ protected $ipc_sockets = array();
+
+ /**
+ * Socket for communication with the parent. Obviously only used after calling fork().
+ * @var resource
+ * @access private
+ */
+
+ protected $parent_sock = false;
+
+ /**
+ * Services_JSON instance.
+ * @var object
+ * @access private
+ */
+
+ protected $json = false;
+
+ /**
+ * PID of the parent process.
+ * @var int
+ * @access private
+ */
+
+ protected $parent_pid = 1;
+
+ /**
+ * List of actions for IPC events.
+ * @var array
+ * @access private
+ */
+
+ protected $ipc_actions = array();
+
+ /**
+ * Constructor. Sets up signal handlers. Nothing to see here, move along.
+ */
+
+ public function __construct()
+ {
+ global $threader_instances;
+
+ declare(ticks=1);
+
+ $threader_instances[] =& $this;
+
+ pcntl_signal(SIGUSR2, 'Threader_SigUsr2');
+ pcntl_signal(SIGCHLD, 'Threader_SigChld');
+
+ $this->json = new Services_JSON(SERVICES_JSON_LOOSE_TYPE);
+ $this->parent_pid = getmypid();
+ }
+
+ /**
+ * Forks the current process. See your system's fork(2) man page for details.
+ * @return int FORK_CHILD if child process, PID of child if parent process. Returns false on failure.
+ */
+
+ public function fork()
+ {
+ // create our new sockets for IPC
+ $socket_pair = stream_socket_pair(STREAM_PF_UNIX, STREAM_SOCK_STREAM, STREAM_IPPROTO_IP);
+ // fork (emoticon of the day: --<E)
+ $fork_result = pcntl_fork();
+ if ( $fork_result == -1 )
+ {
+ // fork failed.
+ return false;
+ }
+ else if ( $fork_result )
+ {
+ // we are the parent - register the child
+ fclose($socket_pair[0]);
+ $this->ipc_sockets[$fork_result] = $socket_pair[1];
+ return $fork_result;
+ }
+ else
+ {
+ // we are the child.
+ fclose($socket_pair[1]);
+ $this->parent_sock = $socket_pair[0];
+ $this->is_child = true;
+ return FORK_CHILD;
+ }
+ }
+
+ /**
+ * Are we a child?
+ * @return bool
+ */
+
+ public function is_child()
+ {
+ return $this->is_child;
+ }
+
+ /**
+ * Register an action so that when it is fired over IPC, a custom function can be called.
+ * @param string Action
+ * @param callback Function to call
+ * @return true on success, false on failure
+ */
+
+ function ipc_register($action, $callback)
+ {
+ if ( !is_string($action) || empty($action) || !is_callable($callback) )
+ {
+ return false;
+ }
+ $this->ipc_actions[$action] = $callback;
+ return true;
+ }
+
+ /**
+ * Send through an IPC event. If this is a child, it only notifies the parent; if we're the parent, all children are notified.
+ * @param array Data to be sent through. This must be an associative array containing an "action" key at minimum. If this a key "propagate" set to true, a parent that receives this will propagate the message to all children.
+ * @return null
+ */
+
+ function ipc_send($data)
+ {
+ if ( !isset($data['action']) )
+ {
+ return false;
+ }
+ $data = $this->json->encode($data);
+ if ( $this->is_child() )
+ {
+ fwrite($this->parent_sock, "$data\n");
+ // signal the parent that we've got an event
+ posix_kill($this->parent_pid, SIGUSR2);
+ }
+ else
+ {
+ // signal each child
+ foreach ( $this->ipc_sockets as $pid => $socket )
+ {
+ fwrite($socket, "$data\n");
+ posix_kill($pid, SIGUSR2);
+ }
+ }
+ return null;
+ }
+
+ /**
+ * Handler for SIGCHLD events.
+ * @access private
+ */
+
+ function event_sigchld()
+ {
+ // this should never happen to children.
+ if ( $this->is_child() )
+ {
+ return null;
+ }
+
+ // wait for child to exit.
+ pcntl_wait($status);
+ // for each child PID, kill with signal 0 (effectively, test if process is alive)
+ // if posix_kill fails, it's dead so remove it from the list.
+ foreach ( $this->ipc_sockets as $pid => $socket )
+ {
+ if ( !@posix_kill($pid, 0) )
+ {
+ // signal failed.
+ fclose($socket);
+ unset($this->ipc_sockets[$pid]);
+ }
+ }
+ }
+
+ /**
+ * Handler for IPC events.
+ * @access private
+ */
+
+ function event_sigusr2()
+ {
+ if ( $this->is_child() )
+ {
+ // this is easy - the parent sent the signal.
+ $command = rtrim(fgets($this->parent_sock, 102400), "\n");
+ }
+ else
+ {
+ // since we can't find which PID sent the signal, set the timeout to a very small amount
+ // of time and try to read; if we get something, awesome.
+ foreach ( $this->ipc_sockets as $pid => $socket )
+ {
+ // 1000 microseconds = 1/80th of the time it takes you to blink.
+ @stream_set_timeout($socket, 0, 1000);
+ $command = rtrim(@fgets($socket, 102400), "\n");
+ if ( !empty($command) )
+ {
+ break;
+ }
+ }
+ }
+ if ( empty($command) )
+ {
+ // hmm, got a sigusr2 without an incoming command. oh well, ignore.
+ return null;
+ }
+ $command = $this->json->decode($command);
+ if ( !isset($command['action']) )
+ {
+ // no action = no way to figure out how to handle this.
+ return null;
+ }
+ if ( !isset($this->ipc_actions[$command['action']]) )
+ {
+ // action not registered
+ return null;
+ }
+ // should we propagate this event?
+ if ( !$this->is_child() && isset($command['propagate']) && $command['propagate'] === true )
+ {
+ $this->ipc_send($command);
+ }
+ // we're good
+ @call_user_func($this->ipc_actions[$command['action']], $command, $this);
+ }
+
+ /**
+ * Kills all child processes.
+ * @access public
+ */
+
+ public function kill_all_children()
+ {
+ foreach ( $this->ipc_sockets as $pid => $socket )
+ {
+ $socklen = count($this->ipc_sockets);
+ posix_kill($pid, SIGTERM);
+ // wait until we are conscious of this child's death
+ while ( count($this->ipc_sockets) >= $socklen )
+ {
+ usleep(20000);
+ }
+ }
+ }
+
+}
+
+?>