clock->now() >= $cronJobExecution->getLastEnd()->add(new \DateInterval('P1D')); } public function getKey(): string { return self::KEY; } public function run(array $lastExecutionData): ?array { $this->logger->info('Cronjob started: Canceling stale workflows.'); $olderThanDate = $this->clock->now()->sub(new \DateInterval(self::KEEP_INTERVAL)); $staleEntityWorkflows = $this->workflowRepository->findWorkflowsWithoutFinalStepAndOlderThan($olderThanDate); $lastCanceled = $lastExecutionData[self::LAST_CANCELED_WORKFLOW] ?? 0; $processedCount = 0; foreach ($staleEntityWorkflows as $staleEntityWorkflow) { try { $this->messageBus->dispatch(new CancelStaleWorkflowMessage($staleEntityWorkflow->getId())); $lastCanceled = max($staleEntityWorkflow->getId(), $lastCanceled); ++$processedCount; } catch (\Exception $e) { $this->logger->error('Failed to dispatch CancelStaleWorkflow', ['exception' => $e, 'entityWorkflowId' => $staleEntityWorkflow->getId()]); continue; } if (0 === $processedCount % 10) { $this->entityManager->clear(); } } $this->logger->info("Cronjob completed: {$processedCount} workflows processed."); return [self::LAST_CANCELED_WORKFLOW => $lastCanceled]; } }