clock->now() >= $cronJobExecution->getLastEnd()->add(new \DateInterval('P1D')); } public function getKey(): string { return self::KEY; } public function run(array $lastExecutionData): ?array { $this->logger->info('Cronjob started: Canceling stale workflows.'); $olderThanDate = $this->clock->now()->sub(new \DateInterval(self::KEEP_INTERVAL)); $staleWorkflowIds = $this->workflowRepository->findWorkflowsWithoutFinalStepAndOlderThan($olderThanDate); $lastCanceled = self::LAST_CANCELED_WORKFLOW; $processedCount = 0; foreach ($staleWorkflowIds as $wId) { try { $this->messageBus->dispatch(new CancelStaleWorkflowMessage($wId)); $lastCanceled = max($wId, $lastCanceled); ++$processedCount; } catch (\Exception $e) { $this->logger->error("Failed to dispatch CancelStaleWorkflow for ID {$wId}", ['exception' => $e]); continue; } } $this->logger->info("Cronjob completed: {$processedCount} workflows processed."); return [self::LAST_CANCELED_WORKFLOW => $lastCanceled]; } }