Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 18 additions & 8 deletions Command/PurgeQueuesCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ protected function configure()
->setDescription('Purge queues')
->addOption('file', 'f', InputOption::VALUE_REQUIRED, 'File to read')
->addOption('force', null, InputOption::VALUE_NONE, 'If set, the task will not ask for confirm purge')
->addOption('priority', 'p', InputOption::VALUE_OPTIONAL, 'Get messages from specific priority')
->addArgument('queues', InputArgument::IS_ARRAY, 'queues to purge')
->setHelp(<<<HELP
This command purges queues.
Expand All @@ -52,9 +53,10 @@ protected function configure()
/**
* @param QueueClientInterface $queueClient
* @param string $fileName
* @param string|null $priority
* @return int
*/
private function purgeFromFile($queueClient, $fileName)
private function purgeFromFile($queueClient, $fileName, $priority)
{
try {
$processor = new Processor();
Expand All @@ -67,17 +69,17 @@ private function purgeFromFile($queueClient, $fileName)
return 1;
}
array_walk_recursive($processedConfiguration, 'ReputationVIP\Bundle\QueueClientBundle\QueueClientFactory::resolveParameters', $this->getContainer());
$this->output->write('Start delete queue.', Output::INFO);
$this->output->write('Start purge queue.', Output::INFO);
foreach ($processedConfiguration[QueuesConfiguration::QUEUES_NODE] as $queue) {
$queueName = $queue[QueuesConfiguration::QUEUE_NAME_NODE];
try {
$queueClient->deleteQueue($queueName);
$this->output->write('Queue ' . $queueName . ' deleted.', Output::INFO);
$queueClient->purgeQueue($queueName, $priority);
$this->output->write('Queue ' . $queueName . ' purged.', Output::INFO);
} catch (\Exception $e) {
$this->output->write($e->getMessage(), Output::WARNING);
}
}
$this->output->write('End delete queue.', Output::INFO);
$this->output->write('End purge queue.', Output::INFO);

return 0;
}
Expand All @@ -91,6 +93,7 @@ protected function execute(InputInterface $input, OutputInterface $output)
{
$helper = $this->getHelper('question');
$force = $input->getOption('force') ? true : false;

try {
/** @var LoggerInterface $logger */
$logger = $this->getContainer()->get('logger');
Expand All @@ -106,14 +109,21 @@ protected function execute(InputInterface $input, OutputInterface $output)

return 1;
}
$priority = null;
if ($input->getOption('priority')) {
$priority = $input->getOption('priority');
if (!in_array($priority, $queueClient->getPriorityHandler()->getAll())) {
throw new \InvalidArgumentException('Priority "' . $priority . '" not found.');
}
}
if ($input->getOption('file')) {
$fileName = $input->getOption('file');
if (!($force || $helper->ask($input, $output, new ConfirmationQuestion('Purge queues in file "' . $fileName . '"?', false)))) {

return 0;
}

return $this->purgeFromFile($queueClient, $fileName);
return $this->purgeFromFile($queueClient, $fileName, $priority);
} else {
$queues = $input->getArgument('queues');
if (count($queues)) {
Expand All @@ -123,7 +133,7 @@ protected function execute(InputInterface $input, OutputInterface $output)
}
foreach ($queues as $queue) {
try {
$queueClient->purgeQueue($queue);
$queueClient->purgeQueue($queue, $priority);
$this->output->write('Queue ' . $queue . ' purged.', Output::INFO);
} catch (\Exception $e) {
$this->output->write($e->getMessage(), Output::WARNING);
Expand All @@ -139,7 +149,7 @@ protected function execute(InputInterface $input, OutputInterface $output)
return 0;
}

return $this->purgeFromFile($queueClient, $fileName);
return $this->purgeFromFile($queueClient, $fileName, $priority);
} catch (InvalidArgumentException $e) {
$this->output->write('No queue_client.queues_file parameter found.', Output::CRITICAL);

Expand Down