taskList(); return array_key_exists($taskId, $tasks) ? $tasks[$taskId] : null; } public function findByName(string $className): ?array { return $this->findById( (int)self::$db->scalar(" SELECT pt.periodic_task_id FROM periodic_task pt WHERE pt.classname = ? ", $className ) ); } public function heading(): Util\SortableTableHeader { return $this->heading ??= new Util\SortableTableHeader( 'next', [ 'name' => ['dbColumn' => 'name', 'defaultSort' => 'asc', 'text' => 'Name'], 'period' => ['dbColumn' => 'period', 'defaultSort' => 'asc', 'text' => 'Interval'], 'runs' => ['dbColumn' => 'runs', 'defaultSort' => 'desc', 'text' => 'Runs'], 'duration' => ['dbColumn' => 'duration', 'defaultSort' => 'desc', 'text' => 'Duration'], 'processed' => ['dbColumn' => 'processed', 'defaultSort' => 'desc', 'text' => 'Processed'], 'status' => ['dbColumn' => 'status', 'defaultSort' => 'desc', 'text' => 'Status'], 'errors' => ['dbColumn' => 'errors', 'defaultSort' => 'desc', 'text' => 'Errors'], 'events' => ['dbColumn' => 'events', 'defaultSort' => 'desc', 'text' => 'Events'], 'last' => [ 'dbColumn' => 'last_run IS NULL ASC, is_enabled DESC, last_run', 'defaultSort' => 'desc', 'text' => 'Last Run' ], 'next' => [ 'dbColumn' => 'next_run IS NULL ASC, is_enabled DESC, run_now DESC, next_run', 'defaultSort' => 'asc', 'text' => 'Next Run' ], ] ); } public function taskDetailList(int $days = 7): array { self::$db->prepared_query(" SELECT pt.periodic_task_id, name, description, period, is_enabled, is_sane, run_now, coalesce(stats.runs, 0) AS runs, coalesce(stats.processed, 0) AS processed, coalesce(stats.errors, 0) AS errors, coalesce(events.events, 0) AS events, coalesce(pth.duration_ms, 0) AS duration, coalesce(pth.status, '') AS status, pth.launch_time AS last_run, pth.launch_time + INTERVAL period SECOND AS next_run FROM periodic_task pt LEFT JOIN ( SELECT periodic_task_id, max(periodic_task_history_id) AS latest, count(*) AS runs, sum(num_errors) AS errors, sum(num_items) AS processed FROM periodic_task_history WHERE launch_time > (now() - INTERVAL ? DAY) GROUP BY periodic_task_id ) stats USING (periodic_task_id) LEFT JOIN ( SELECT pth.periodic_task_id, count(*) AS events FROM periodic_task_history_event pthe INNER JOIN periodic_task_history pth ON (pthe.periodic_task_history_id = pth.periodic_task_history_id) WHERE pth.launch_time > (now() - INTERVAL ? DAY) GROUP BY pth.periodic_task_id ) events ON (pt.periodic_task_id = events.periodic_task_id) LEFT JOIN periodic_task_history pth ON (stats.latest = pth.periodic_task_history_id) ORDER BY {$this->heading()->orderBy()} {$this->heading()->dir()}, name ASC ", $days, $days ); return self::$db->to_array('periodic_task_id', MYSQLI_ASSOC); } public function taskList(): array { self::$db->prepared_query(" SELECT periodic_task_id, name, classname, description, period, is_enabled, is_sane, is_debug, run_now FROM periodic_task "); return self::$db->to_array('periodic_task_id', MYSQLI_ASSOC); } public function insaneTaskList(): int { return count(array_filter($this->taskList(), fn ($v) => !$v['is_sane'] )); } public static function isClassValid(string $class): bool { return class_exists('Gazelle\\Task\\' . $class); } public function enqueue(int $taskId): int { self::$db->prepared_query(" UPDATE periodic_task SET is_sane = true, run_now = true WHERE periodic_task_id = ? ", $taskId ); return self::$db->affected_rows(); } public function clear(int $taskId): int { self::$db->prepared_query(' UPDATE periodic_task SET run_now = false WHERE periodic_task_id = ? ', $taskId ); return self::$db->affected_rows(); } public function updateTask( int $taskId, string $name, string $class, string $description, int $period, bool $isEnabled, bool $isSane, bool $isDebug ): int { if (!self::isClassValid($class)) { return 0; } self::$db->prepared_query(" UPDATE periodic_task SET name = ?, classname = ?, description = ?, period = ?, is_enabled = ?, is_sane = ?, is_debug = ? WHERE periodic_task_id = ? ", $name, $class, $description, $period, (int)$isEnabled, (int)$isSane, (int)$isDebug, $taskId, ); return self::$db->affected_rows(); } public function taskRunTotal(int $taskId): int { return (int)self::$db->scalar(" SELECT count(*) FROM periodic_task_history WHERE periodic_task_id = ? ", $taskId ); } public function taskHistory( int $taskId, int $limit, int $offset, ): ?TaskScheduler\TaskHistory { self::$db->prepared_query(" SELECT periodic_task_history_id, launch_time, status, num_errors, num_items, duration_ms FROM periodic_task_history WHERE periodic_task_id = ? ORDER BY launch_time DESC LIMIT ? OFFSET ? ", $taskId, $limit, $offset ); $items = self::$db->to_array('periodic_task_history_id', MYSQLI_ASSOC); $historyEvents = []; if (count($items)) { self::$db->prepared_query(" SELECT periodic_task_history_id, event_time, severity, event, reference FROM periodic_task_history_event WHERE periodic_task_history_id IN (" . placeholders($items) . ") ORDER BY event_time, periodic_task_history_event_id ", ...array_keys($items)); $events = self::$db->to_array(false, MYSQLI_ASSOC); foreach ($events as $event) { [$historyId, $eventTime, $severity, $message, $reference] = array_values($event); $historyEvents[$historyId][] = new TaskScheduler\Event($severity, $message, $reference, $eventTime); } } $history = new TaskScheduler\TaskHistory( $this->findById($taskId)['name'], $this->taskRunTotal($taskId) ); foreach ($items as $item) { [$historyId, $launchTime, $status, $numErrors, $numItems, $duration] = array_values($item); $taskEvents = $historyEvents[$historyId] ?? []; $history->items[] = new TaskScheduler\HistoryItem($launchTime, $status, $numErrors, $numItems, $duration, $taskEvents); } return $history; } private function constructAxes(array $data, string $key, array $axes, bool $time): array { $result = []; foreach ($axes as $axis) { if (is_array($axis)) { $taskId = $axis[0]; $name = $axis[1]; } else { $taskId = $axis; $name = $axis; } $result[] = [ 'name' => $name, 'data' => array_map( fn ($v) => [$time ? (int)strtotime($v[$key]) * 1000 : $v[$key], (int)$v[$taskId]], $data ) ]; } return $result; } public function runtimeStats(int $days = 90): array { self::$db->prepared_query(" SELECT date_format(pth.launch_time, '%Y-%m-%d %H:00:00') AS date, sum(pth.duration_ms) AS duration, sum(pth.num_items) AS processed FROM periodic_task pt INNER JOIN periodic_task_history pth USING (periodic_task_id) WHERE pt.is_enabled IS TRUE AND pth.launch_time >= now() - INTERVAL 1 DAY GROUP BY 1 ORDER BY 1 "); $hourly = $this->constructAxes(self::$db->to_array(false, MYSQLI_ASSOC), 'date', ['duration', 'processed'], true); self::$db->prepared_query(" SELECT date(pth.launch_time) AS date, sum(pth.duration_ms) AS duration, sum(pth.num_items) AS processed FROM periodic_task pt INNER JOIN periodic_task_history pth USING (periodic_task_id) WHERE pt.is_enabled IS TRUE AND pth.launch_time BETWEEN curdate() - INTERVAL ? DAY AND curdate() GROUP BY 1 ORDER BY 1 ", $days ); $daily = $this->constructAxes(self::$db->to_array(false, MYSQLI_ASSOC), 'date', ['duration', 'processed'], true); self::$db->prepared_query(" SELECT pt.name, avg(pth.duration_ms) AS duration_avg, avg(pth.num_items) AS processed_avg FROM periodic_task pt INNER JOIN periodic_task_history pth USING (periodic_task_id) WHERE pt.is_enabled IS TRUE AND pth.launch_time BETWEEN curdate() - INTERVAL ? DAY AND curdate() GROUP BY 1 ORDER BY 1 ", $days ); $tasks = $this->constructAxes(self::$db->to_array(false, MYSQLI_ASSOC), 'name', ['duration_avg', 'processed_avg'], false); $totals = self::$db->rowAssoc(" SELECT count(pth.periodic_task_history_id) AS runs, sum(pth.duration_ms) AS duration, sum(pth.num_items) AS processed, count(pthe.periodic_task_history_event_id) AS events, sum(pth.num_errors) AS errors FROM periodic_task pt INNER JOIN periodic_task_history pth USING (periodic_task_id) LEFT JOIN periodic_task_history_event pthe USING (periodic_task_history_id) WHERE pt.is_enabled IS TRUE AND pth.launch_time BETWEEN curdate() - INTERVAL ? DAY AND curdate() ", $days ); return [ 'hourly' => $hourly, 'daily' => $daily, 'tasks' => $tasks, 'totals' => $totals, ]; } public function taskRuntimeStats(int $taskId, int $days = 90): array { self::$db->prepared_query(" SELECT date(pth.launch_time) AS date, sum(pth.duration_ms) AS duration, sum(pth.num_items) AS processed FROM periodic_task pt INNER JOIN periodic_task_history pth USING (periodic_task_id) WHERE pt.periodic_task_id = ? AND pth.launch_time BETWEEN curdate() - INTERVAL ? DAY AND curdate() GROUP BY 1 ORDER BY 1 ", $taskId, $days ); return $this->constructAxes(self::$db->to_array(false, MYSQLI_ASSOC), 'date', ['duration', 'processed'], true); } public function runNow(int $taskId): int { self::$db->prepared_query(" UPDATE periodic_task SET run_now = 1 - run_now WHERE periodic_task_id = ? ", $taskId ); return self::$db->affected_rows(); } public function run(): int { $pendingMigrations = array_filter( json_decode( (string)shell_exec(BIN_PHINX . ' status -c ' . PHINX_MYSQL . ' --format=json | tail -n 1'), true )['migrations'], fn ($value) => count($value) > 0 && $value['migration_status'] === 'down' ); if ($pendingMigrations) { Util\Irc::sendMessage(IRC_CHAN_DEV, 'Pending migrations found, scheduler cannot continue'); echo "Pending migrations found, aborting\n"; return 0; } /** * We attempt to run as many tasks as we can within a minute. If a task * runs over the TTL, it will be noted as in progress, so the next * invocation of the scheduler ignores it. When the task finally * returns, this invocation will exit. * If a task fails, do not try to run again in this slice. */ $fail = [0]; $run = 0; $TTL = microtime(true) + 58; while (microtime(true) < $TTL) { $taskId = (int)self::$db->scalar(" SELECT pt.periodic_task_id FROM periodic_task pt LEFT JOIN ( SELECT pth.periodic_task_id, max(pth.launch_time) AS launch_time FROM periodic_task_history pth WHERE pth.status = 'completed' GROUP BY pth.periodic_task_id ) last USING (periodic_task_id) WHERE pt.is_enabled IS TRUE AND pt.is_sane IS TRUE AND ( last.periodic_task_id is null OR last.launch_time + INTERVAL pt.period SECOND < now() OR pt.run_now IS TRUE ) AND NOT EXISTS ( SELECT 1 FROM periodic_task_history r WHERE r.status = 'running' AND r.periodic_task_id = pt.periodic_task_id ) AND pt.periodic_task_id NOT IN (" . placeholders($fail) . ") LIMIT 1 ", ...$fail ); if (!$taskId) { // no tasks remaining to be run break; } $run++; $result = $this->runTask($taskId); if ($result == -1) { $fail[] = $taskId; } } return $run; } public function runClass(string $className, bool $debug = false): int { $task = $this->findByName($className); if ($task === null) { return -1; } return $this->runTask($task['periodic_task_id'], $debug); } public function runTask(int $taskId, bool $debug = false): int { $task = $this->findById($taskId); if ($task === null) { return -1; } echo "Running task {$task['name']}..."; $taskRunner = $this->createRunner($taskId, $task['name'], $task['classname'], $task['is_debug'] || $debug); if ($taskRunner === null) { echo "DONE! (0.000)\n"; Util\Irc::sendMessage(IRC_CHAN_DEV, "Failed to construct task {$task['name']}"); return -1; } // some tasks require a viewer context (and probably should not...) $sysop = new \Gazelle\Manager\User()->findById( (int)self::$db->scalar(' SELECT um.id FROM users_main um INNER JOIN permissions p on (p.ID = um.PermissionID) ORDER BY p.level DESC LIMIT 1 ') ); $this->requestContext()->setViewer($sysop); $processed = -1; $taskRunner->begin(); try { $taskRunner->run(); } catch (\Throwable $e) { $taskRunner->log('Caught exception: ' . str_replace(SERVER_ROOT, '', $e->getMessage()), 'error'); } finally { $processed = $taskRunner->end($task['is_sane']); } if ($task['run_now']) { $this->clear($taskId); } return $processed; } private function createRunner(int $taskId, string $name, string $class, bool $isDebug): mixed { $class = 'Gazelle\\Task\\' . $class; if (!class_exists($class)) { return null; } return new $class($taskId, $name, $isDebug); } }