package MangoX::Queue;

use Mojo::Base 'Mojo::EventEmitter';

use Carp 'croak';
use DateTime;
use DateTime::Duration;
use Mojo::Log;
use Mango::BSON ':bson';
use MangoX::Queue::Delay;

no warnings 'experimental::smartmatch';

our $VERSION = '0.03';

# A logger
has 'log' => sub { Mojo::Log->new->level('error') };

# The Mango::Collection representing the queue
has 'collection';

# A MangoX::Queue::Delay
has 'delay' => sub { MangoX::Queue::Delay->new };

# How long to wait before assuming a job has failed
has 'timeout' => sub { $ENV{MANGOX_QUEUE_JOB_TIMEOUT} // 60 };

# How many times to retry a job before giving up
has 'retries' => sub { $ENV{MANGOX_QUEUE_JOB_RETRIES} // 5 };

# Store Mojo::IOLoop->timer IDs
has 'consumers' => sub { {} };

# Store plugins
has 'plugins' => sub { {} };

sub new {
	my $self = shift->SUPER::new(@_);

	croak qq{No Mango::Collection provided to constructor} unless ref($self->collection) eq 'Mango::Collection';

	return $self;

sub plugin {
	my ($self, $name, $options) = @_;

	croak qq{Plugin $name already loaded} if exists $self->plugins->{$name};

		no strict 'refs';
		unless($name->can('new')) {
			eval "require $name" or croak qq{Failed to load plugin $name: $@};

	$self->plugins->{$name} = $name->new(%$options);


	return $self->plugins->{$name};

sub get_options {
	my ($self) = @_;

	return {
		query => {
			'$or' => [{
				status => {
					'$in' => [ 'Pending' ]
				status => {
					'$in' => [ 'Retrieved' ]
				retrieved => {
					'$lt' => DateTime->now->subtract_duration(DateTime::Duration->new(seconds => $self->timeout))
		update => {
			'$set' => {
				status => 'Retrieved',
				retrieved => DateTime->now,
			'$inc' => {
				attempt => 1,
		sort => bson_doc( # Sort by priority, then in order of creation
			'priority' => 1,
			'created' => -1,
		new => 0, # Get the original object (so we can see status etc)

sub enqueue {
	my ($self, @args) = @_;

	# args maybe
	# - 'job_name'
	# - foo => bar, 'job_name'
	# - 'job_name', $callback
	# - foo => bar, 'job_name', $callback

	my $callback = ref($args[-1]) eq 'CODE' ? pop @args : undef;
	my $job = pop @args;
	my %args;
	%args = (@args) if scalar @args;

	my $db_job = {
		priority => $args{priority} // 1,
		created => $args{created} // DateTime->now,
		data => $job,
		status => $args{status} // 'Pending',
		attempt => 1,

	if($callback) {
		return $self->collection->insert($db_job => sub {
			my ($collection, $error, $oid) = @_;
			$db_job->{_id} = $oid;
			$self->emit_safe(enqueued => $db_job) if $self->has_subscribers('enqueued');
	} else {
		my $id = $self->collection->insert($db_job);
		$db_job->{_id} = $id;
		$self->emit_safe(enqueued => $db_job) if $self->has_subscribers('enqueued');
		return $db_job;

sub watch {
	my ($self, $id_or_job, $status, $callback) = @_;

	my $id = ref($id_or_job) ? $id_or_job->{_id} : $id_or_job;

	$status //= 'Complete';

	# args
	# - watch $queue $id, 'Status' => $callback

	if($callback) {
		# Non-blocking
		$self->log->debug("Waiting for $id on status $status in non-blocking mode");
		return Mojo::IOLoop->timer(0 => sub { $self->_watch_nonblocking($id, $status, $callback) });
	} else {
		# Blocking
		$self->log->debug("Waiting for $id on status $status in blocking mode");
		return $self->_watch_blocking($id, $status);

sub _watch_blocking {
	my ($self, $id, $status) = @_;

	while(1) {
		my $doc = $self->collection->find_one({'_id' => $id});
		$self->log->debug("Job found by Mango: " . ($doc ? 'Yes' : 'No'));

		if($doc && ((!ref($status) && $doc->{status} eq $status) || (ref($status) eq 'ARRAY' && $doc->{status} ~~ @$status))) {
			return 1;
		} else {

sub _watch_nonblocking {
	my ($self, $id, $status, $callback) = @_;

	$self->collection->find_one({'_id' => $id} => sub {
		my ($cursor, $err, $doc) = @_;
		$self->log->debug("Job found by Mango: " . ($doc ? 'Yes' : 'No'));
		if($doc && ((!ref($status) && $doc->{status} eq $status) || (ref($status) eq 'ARRAY' && $doc->{status} ~~ @$status))) {
			$self->log->debug("Status is $status");
		} else {
			$self->log->debug("Job not found or status doesn't match");
			$self->delay->wait(sub {
				return unless Mojo::IOLoop->is_running;
				Mojo::IOLoop->timer(0 => sub { $self->_watch_nonblocking($id, $status, $callback) });
			return undef;

sub requeue {
	my ($self, $job, $callback) = @_;

	$job->{status} = 'Pending';
	return $self->update($job, $callback);

sub dequeue {
	my ($self, $id_or_job, $callback) = @_;

	# TODO option to not remove on dequeue?

	my $id = ref($id_or_job) ? $id_or_job->{_id} : $id_or_job;

	if($callback) {
		$self->collection->remove({'_id' => $id} => sub {
			$self->emit_safe(dequeued => $id_or_job) if $self->has_subscribers('dequeued');
	} else {
		$self->collection->remove({'_id' => $id});
		$self->emit_safe(dequeued => $id_or_job) if $self->has_subscribers('dequeued');

sub get {
	my ($self, $id_or_job, $callback) = @_;

	my $id = ref($id_or_job) ? $id_or_job->{_id} : $id_or_job;

	if($callback) {
		return $self->collection->find_one({'_id' => $id} => sub {
			my ($collection, $error, $doc) = @_;
	} else {
		return $self->collection->find_one({'_id' => $id});

sub update {
	my ($self, $job, $callback) = @_;

	if($callback) {
		return $self->collection->find_one({'_id' => $job->{_id}} => sub {
			my ($collection, $error, $doc) = @_;
	} else {
		return $self->collection->update({'_id' => $job->{_id}}, $job, {upsert => 1});

sub fetch {
	my ($self, @args) = @_;

	# fetch $queue status => 'Complete', sub { my $job = shift; }

	my $callback = ref($args[-1]) eq 'CODE' ? pop @args : undef;
	my %args;
	%args = (@args) if scalar @args;

	$self->log->debug("In fetch");

	if($callback) {
		$self->log->debug("Fetching in non-blocking mode");
		my $consumer_id = (scalar keys %{$self->consumers}) + 1;
		$self->consumers->{$consumer_id} = Mojo::IOLoop->timer(0 => sub { $self->_consume_nonblocking(\%args, $consumer_id, $callback, 1) });
		return $consumer_id;
	} else {
		$self->log->debug("Fetching in blocking mode");
		return $self->_consume_blocking(\%args, 1);

sub consume {
	my ($self, @args) = @_;

	# consume $queue status => 'Failed', sub { my $job = shift; }

	my $callback = ref($args[-1]) eq 'CODE' ? pop @args : undef;
	my %args;
	%args = (@args) if scalar @args;

	$self->log->debug("In consume");

	if($callback) {
		$self->log->debug("consuming in non-blocking mode");
		my $consumer_id = (scalar keys %{$self->consumers}) + 1;
		$self->consumers->{$consumer_id} = Mojo::IOLoop->timer(0 => sub { $self->_consume_nonblocking(\%args, $consumer_id, $callback, 0) });
		$self->log->debug("Timer scheduled, consumer_id $consumer_id has timer ID: " . $self->consumers->{$consumer_id});
		return $consumer_id;
	} else {
		$self->log->debug("consuming in blocking mode");
		return $self->_consume_blocking(\%args, 0);

sub release {
	my ($self, $consumer_id) = @_;

	$self->log->debug("Releasing consumer $consumer_id with timer ID: " . $self->consumers->{$consumer_id});

	delete $self->consumers->{$consumer_id};


sub _consume_blocking {
	my ($self, $args, $fetch) = @_;

	while(1) {
		my $opts = $self->get_options;
		$opts->{query} = $args if scalar keys %$args;

		my $doc = $self->collection->find_and_modify($opts);
		$self->log->debug("Job found by Mango: " . ($doc ? 'Yes' : 'No'));

		if($doc) {
			$self->emit_safe(consumed => $doc) if $self->has_subscribers('consumed');
			return $doc;
		} else {
			last if $fetch;

sub _consume_nonblocking {
	my ($self, $args, $consumer_id, $callback, $fetch) = @_;

	my $opts = $self->get_options;
	$opts->{query} = $args if scalar keys %$args;

	$self->collection->find_and_modify($opts => sub {
		my ($cursor, $err, $doc) = @_;
		$self->log->debug("Job found by Mango: " . ($doc ? 'Yes' : 'No'));
		if($doc) {
			$self->emit_safe(consumed => $doc) if $self->has_subscribers('consumed');
			return unless Mojo::IOLoop->is_running;
			return if $fetch;
			return unless exists $self->consumers->{$consumer_id};
			$self->consumers->{$consumer_id} = Mojo::IOLoop->timer(0 => sub { $self->_consume_nonblocking($args, $consumer_id, $callback, 0) });
			$self->log->debug("Timer rescheduled (recursive immediate), consumer_id $consumer_id has timer ID: " . $self->consumers->{$consumer_id});
		} else {
			return unless Mojo::IOLoop->is_running;
			return if $fetch;
			$self->delay->wait(sub {
				return unless exists $self->consumers->{$consumer_id};
				$self->consumers->{$consumer_id} = Mojo::IOLoop->timer(0 => sub { $self->_consume_nonblocking($args, $consumer_id, $callback, 0) });
				$self->log->debug("Timer rescheduled (recursive delayed), consumer_id $consumer_id has timer ID: " . $self->consumers->{$consumer_id});
			return undef;


=encoding utf8

=head1 NAME

MangoX::Queue - A MongoDB queue implementation using Mango


L<MangoX::Queue> is a MongoDB backed queue implementation using L<Mango> to support
blocking and non-blocking queues.

L<MangoX::Queue> makes no attempt to handle the L<Mango> connection, database or
collection - pass in a collection to the constructor and L<MangoX::Queue> will
use it. The collection can be plain, capped or sharded.


	use Mango;
	use MangoX::Queue;

	my $mango = Mango->new("mongodb://localhost:27017");
	my $collection = $mango->db('my_db')->collection('my_queue');

	my $queue = MangoX::Queue->new(collection => $collection);

	# To add a job
	my $id = enqueue $queue 'test'; # Blocking
	enqueue $queue 'test' => sub { my $id = shift; }; # Non-blocking

	# To set options
	my $id = enqueue $queue priority => 1, created => DateTime->now, 'test'; # Blocking
	enqueue $queue priority => 1, created => DateTime->now, 'test' => sub { my $id = shift; }; # Non-blocking

	# To watch for a specific job status
	watch $queue $id; # Blocking
	watch $queue $id, 'Complete' => sub { # Non-blocking
		# Job status is 'Complete'

	# To fetch a job
	my $job = fetch $queue; # Blocking
	fetch $queue sub { # Non-blocking
		my ($job) = @_;
		# ...

	# To get a job by id
	my $job = get $queue $id; # Blocking
	get $queue $id => sub { my $job = shift; }; # Non-blocking

	# To requeue a job
	my $id = requeue $queue $job; # Blocking
	requeue $queue $job => sub { my $id = shift; }; # Non-blocking

	# To dequeue a job
	dequeue $queue $id; # Blocking
	dequeue $queue $id => sub { }; # Non-blocking

	# To consume a queue
	while(my $job = consume $queue) { # Blocking
		# ...
	my $consumer = consume $queue sub { # Non-blocking
		my ($job) = @_;
		# ...

	# To stop consuming a queue
	release $queue $consumer;

	# To listen for events
	on $queue enqueued => sub ( my ($queue, $job) = @_; };
	on $queue dequeued => sub ( my ($queue, $job) = @_; };
	on $queue consumed => sub { my ($queue, $job) = @_; };

	# To register a plugin
	plugin $queue 'MangoX::Queue::Plugin::Statsd';


L<MangoX::Queue> implements the following attributes.

=head2 collection

    my $collection = $queue->collection;

    my $queue = MangoX::Queue->new(collection => $collection);

The L<Mango::Collection> representing the MongoDB queue collection.

=head2 delay

	my $delay = $queue->delay;

The L<MangoX::Queue::Delay> responsible for dynamically controlling the
delay between queue queries.

=head2 plugins

	my $plugins = $queue->plugins;

Returns a hash containing the plugins registered with this queue.

=head2 retries

	my $retries = $queue->retries;

The number of times a job will be picked up from the queue before it is
marked as failed.

=head2 timeout

	my $timeout = $queue->timeout;

The time (in seconds) a job is allowed to stay in Retrieved state before
it is released back into Pending state. Defaults to 60 seconds.

=head1 EVENTS

L<MangoX::Queue> inherits from L<Mojo::EventEmitter> and emits the following events

=head2 consumed

	on $queue consumed => sub {
		my ($queue, $job) = @_;
		# ...

Emitted when an item is consumed (either via consume or fetch)

=head2 dequeued

	on $queue dequeued => sub {
		my ($queue, $job) = @_;
		# ...

Emitted when an item is dequeued

=head2 enqueued

	on $queue enqueued => sub {
		my ($queue, $job) = @_;
		# ...

Emitted when an item is enqueued

=head1 METHODS

L<MangoX::Queue> implements the following methods.

=head2 consume

	# In blocking mode
	while(my $job = consume $queue) {
		# ...
	while(my $job = $queue->consume) {
		# ...

	# In non-blocking mode
	consume $queue sub {
		my ($job) = @_;
		# ...
	$queue->consume(sub {
		my ($job) = @_;
		# ...

Waits for jobs to arrive on the queue, sleeping between queue checks using L<MangoX::Queue::Delay> or L<Mojo::IOLoop>.

Currently sets the status to 'Retrieved' before returning the job.

=head2 dequeue

	my $job = fetch $queue;
	dequeue $queue $job;

Dequeues a job. Currently removes it from the collection.

=head2 enqueue

	enqueue $queue 'job name';
	enqueue $queue [ 'some', 'data' ];
	enqueue $queue +{ foo => 'bar' };

	$queue->enqueue('job name');
	$queue->enqueue([ 'some', 'data' ]);
	$queue->enqueue({ foo => 'bar' });

Add an item to the queue.

Currently uses priority 1 with a job status of 'Pending'.

=head2 fetch

	# In blocking mode
	my $job = fetch $queue;
	my $job = $queue->fetch;

	# In non-blocking mode
	fetch $queue sub {
		my ($job) = @_;
		# ...
	$queue->fetch(sub {
		my ($job) = @_;
		# ...

Fetch a single job from the queue, returning undef if no jobs are available.

Currently sets job status to 'Retrieved'.

=head2 get

	my $job = get $queue $id;

Gets a job from the queue by ID. Doesn't change the job status.

=head2 get_options

	my $options = $queue->get_options;

Returns the L<Mango::Collection> options hash used by find_and_modify to
identify and update available queue items.

Wait for a job to enter a certain status.

=head2 release

	my $consumer = consume $queue sub {
		# ...
	release $queue $consumer;

Releases a non-blocking consumer from watching a queue.

=head2 requeue

	my $job = fetch $queue;
	requeue $queue $job;

Requeues a job. Sets the job status to 'Pending'.

=head2 update

	my $job = fetch $queue;
	$job->{status} = 'Failed';
	update $queue $job;

Updates a job in the queue.

=head2 watch

	# In blocking mode
	my $id = enqueue $queue 'test';
	watch $queue $id, 'Complete'; # blocks until job is complete

	# In non-blocking mode
	my $id = enqueue $queue 'test';
	watch $queue $id, 'Complete' => sub {
		# ...

=head1 SEE ALSO

L<Mojolicious>, L<Mango>
