• File: class-updraft-task-manager.php
  • Full Path: /home/atelipy/www/wpmain/languages/assets/updraft-tasks/class-updraft-task-manager.php
  • File size: 9.06 KB
  • MIME-type: text/x-php
  • Charset: utf-8
<?php
/**
 * A task manager that locks and processes the task queue
 */

if (!defined('ABSPATH')) die('Access denied.');

if (!class_exists('Updraft_Task_Manager_1_4')) :

abstract class Updraft_Task_Manager_1_4 {

	protected $loggers;

	public $commands;

	private $queue_semaphore;
	
	/**
	 * Set this to the number of seconds for the lock timeout, or 0 to not use a lock
	 */
	protected $use_per_task_lock = 0;

	/**
	 * The Task Manager constructor
	 */
	public function __construct() {

		if (!class_exists('Updraft_Task_1_2')) require_once('class-updraft-task.php');
		if (!class_exists('Updraft_Task_Manager_Commands_1_0')) require_once('class-updraft-task-manager-commands.php');
		if (!class_exists('Updraft_Semaphore_3_0')) require_once(dirname(__FILE__).'/../updraft-semaphore/class-updraft-semaphore.php');
		if (!class_exists('Updraft_Tasks_Activation')) require_once(dirname(__FILE__).'/class-updraft-tasks-activation.php');

		$this->commands = new Updraft_Task_Manager_Commands_1_0($this);

		do_action('updraft_task_manager_loaded', $this);
	}

	/**
	 * Process a single task in the queue
	 *
	 * @param int|Updraft_Task - $task Task ID or Updraft_Task object.
	 * @return boolean|WP_Error - status of task or error if task not found
	 */
	public function process_task($task) {

		if (!is_a($task, 'Updraft_Task_1_2')) {
			$task_id = (int) $task;
			$task = $this->get_task_instance($task_id);
		}
		
		if (!$task) return new WP_Error('id_invalid', 'Task not found or ID is invalid');

		return $task->attempt(apply_filters('updraft_task_lock_for', $this->use_per_task_lock, $this));
		
	}

	/**
	 * Gets a list of all tasks that matches the $status flag
	 *
	 * @param int|Updraft_Task - $task Task ID or Updraft_Task object.
	 * @return String|WP_Error - status of task or error if task not found.
	 */
	public function get_task_status($task) {

		if (!($task instanceof Updraft_Task_1_2)) {
			$task_id = (int) $task;
			$task = $this->get_task_instance($task_id);
		}
		
		if (!$task) return new WP_Error('id_invalid', 'Task not found or ID is invalid');

		return $task->get_status();
	}

	/**
	 * Ends a given task
	 *
	 * @param int|Updraft_Task - $task Task ID or Updraft_Task object.
	 * @return boolean|WP_Error - Status of the operation or error if task not found.
	 */
	public function end_task($task) {
		
		if (!($task instanceof Updraft_Task_1_2)) {
			$task_id = (int) $task;
			$task = $this->get_task_instance($task_id);
		}
		
		if (!$task) return new WP_Error('id_invalid', 'Task not found or ID is invalid');

		return $task->complete();
	}

    /**
     * Process a the queue of a specifed task type
     *
     * @param string $type queue type to process
     * @return bool true on success, false otherwise
     */
	public function process_queue($type) {

		$task_list = $this->get_active_tasks($type);
		$total = is_array($task_list) ? count($task_list) : 0;

		if (1 > $total) {
			$this->log(sprintf('The queue for tasks of type "%s" is empty. Aborting!', $type));
			return true;
		} else {
			$this->log(sprintf('A total of %d tasks of type %s found and will be processed in this iteration', $total, $type));
		}

		$this->queue_semaphore = new Updraft_Semaphore_3_0($type);
		
		// Prevent PHP warning which trigger from semaphore class set_loggers method if $this->loggers is empty
		if (!empty($this->loggers)) {
			$this->queue_semaphore->set_loggers($this->loggers);
		}

		if (!$this->queue_semaphore->lock()) {

			$this->log(sprintf('Failed to gain semaphore lock (%s) - another process is already processing the queue - aborting (if this is wrong - i.e. if the other process crashed without removing the lock, then another can be started after 1 minute', $type));

			return false;
		}

		$done = 0;
		foreach ($task_list as $task) {
			$this->process_task($task);
			$done++;
			/**
			 * Filters if the queue should be interrupted. Used after processing each task.
			 *
			 * @param boolean $interrupt_queue - If the queue should be interrupted. Default to FALSE
			 * @param object  $task            - The current task object
			 * @param object  $task_manager    - The task manager instance
			 */
			if (apply_filters('updraft_interrupt_tasks_queue_'.$type, false, $task, $this)) {
				break;
			}
		}

		$this->queue_semaphore->release();
		$this->log(sprintf('Successfully processed the queue (%s). %d tasks were processed out of %d.', $type, $done, $total));
		$this->queue_semaphore->delete();

		return $done == $total;
	}

	/**
	 * Cleans out all complete tasks from the DB.
	 *
	 * @param String $type type of the task
	 */
	public function clean_up_old_tasks($type) {
		$completed_tasks = $this->get_completed_tasks($type);

		if (!$completed_tasks) return false;

		$this->log(sprintf('Cleaning up tasks of type (%s). A total of %d tasks will be deleted.', $type, count($completed_tasks)));

		foreach ($completed_tasks as $task) {
			$task->delete_meta();
			$task->delete();
		}

		return true;
	}

	/**
	 * Delete all tasks from queue.
	 *
	 * @param string $type
	 *
	 * @return boolean|integer Number of rows deleted, or (boolean)false upon error
	 */
	public function delete_tasks($task_type) {
		global $wpdb;

		$sql = "DELETE t, tm FROM `{$wpdb->base_prefix}tm_tasks` t LEFT JOIN `{$wpdb->base_prefix}tm_taskmeta` tm ON t.id = tm.task_id WHERE t.type = '{$task_type}'";

		return $wpdb->query($sql);
	}

	/**
	 * Get count of completed and all tasks.
	 *
	 * @return array - [ ['complete_tasks' => , 'all_tasks' => ] ]
	 */
	public function get_status($task_type) {
		global $wpdb;

		$query = $wpdb->prepare(
			"SELECT complete_tasks, all_tasks FROM (SELECT COUNT(*) AS complete_tasks FROM {$wpdb->base_prefix}tm_tasks WHERE `type` = %s AND `status` = %s) a, (SELECT COUNT(*) AS all_tasks FROM {$wpdb->base_prefix}tm_tasks WHERE `type` = %s) b",
			array(
				$task_type,
				'complete',
				$task_type,
			)
		);

		$status = $wpdb->get_row($query, ARRAY_A);

		if (empty($status)) {
			$status = array(
				'complete_tasks' => 0,
				'all_tasks' => 0,
			);
		}

		return $status;
	}

	/**
	 * Fetches  a list of all active tasks
	 *
	 * @param String $type type of the task
	 * @return Mixed - array of Task ojects or NULL if none found
	 */
	public function get_active_tasks($type) {
		return $this->get_tasks('active', $type);
	}

	/**
	 * Gets a list of all completed tasks
	 *
	 * @param String $type type of the task
	 * @return Mixed - array of Task ojects or NULL if none found
	 */
	public function get_completed_tasks($type) {
		return $this->get_tasks('complete', $type);
	}

	/**
	 * Gets a list of all tasks that matches the $status flag
	 *
	 * @param String $status - status of tasks to return, defaults to all tasks
	 * @param String $type   - type of task
	 *
	 * @return Mixed - array of Task objects or NULL if none found
	 */
	public function get_tasks($status, $type) {
		global $wpdb;

		$tasks = array();
		
		if (array_key_exists($status, Updraft_Task_1_2::get_allowed_statuses())) {
			$sql = $wpdb->prepare("SELECT * FROM {$wpdb->base_prefix}tm_tasks WHERE status = %s AND type = %s", $status, $type);
		} else {
			$sql = $wpdb->prepare("SELECT * FROM {$wpdb->base_prefix}tm_tasks WHERE type = %s", $type);
		}

		$_tasks = $wpdb->get_results($sql);

		if (!$_tasks) {
			// if we got an error then check if task manager tables are in the database
			// and recreate them if needed.
			if ($wpdb->last_error) {
				Updraft_Tasks_Activation::reinstall_if_needed();
			}
			return;
		}


		foreach ($_tasks as $_task) {
			$task = $this->get_task_instance($_task->id);
			if ($task) array_push($tasks, $task);
		}

		return $tasks;
	}

	/**
	 * Retrieve the task instance using its ID
	 *
	 * @access public
	 *
	 * @global wpdb $wpdb WordPress database abstraction object.
	 *
	 * @param int $task_id Task ID.
	 * @return Task|Boolean Task object, false otherwise.
	 */
	public function get_task_instance($task_id) {
		global $wpdb;

		$task_id = (int) $task_id;
		if (!$task_id) return false;

		$sql = $wpdb->prepare("SELECT * FROM {$wpdb->base_prefix}tm_tasks WHERE id = %d LIMIT 1", $task_id);
		$_task = $wpdb->get_row($sql);

		if (!$_task)
			return false;

		$class_identifier = $_task->class_identifier;

		if (class_exists($class_identifier)) {
			$task_instance = new $class_identifier($_task);
			$task_instance->set_loggers($this->loggers);
			return $task_instance;
		}

		return false;
	}

	/**
	 * Sets the logger for this instance.
	 *
	 * @param array $loggers - the loggers for this task
	 */
	public function set_loggers($loggers) {
		foreach ($loggers as $logger) {
			$this->add_logger($logger);
		}
	}

	/**
	 * Add a logger to loggers list
	 *
	 * @param Object $logger - a logger for the instance
	 */
	public function add_logger($logger) {
		$this->loggers[] = $logger;
	}

	/**
	 * Return list of loggers
	 *
	 * @return array
	 */
	public function get_loggers() {
		return $this->loggers;
	}

	/**
	 * Captures and logs any interesting messages
	 *
	 * @param String $message    - the error message
	 * @param String $error_type - the error type
	 */
	public function log($message, $error_type = 'info') {
		if (isset($this->loggers)) {
			foreach ($this->loggers as $logger) {
				$logger->log($message, $error_type);
			}
		}
	}

}

endif;