Queue Management
Understanding and managing the custom queue system for reliable background job processing.
Overview
GSM Middleware uses a custom queue system for processing background jobs. This system is more reliable than WordPress Action Scheduler and provides better control over job execution, retries, and monitoring.
Why a Custom Queue?
Advantages Over Action Scheduler
- Better Reliability: Custom retry logic with exponential backoff
- Priority Support: High-priority jobs processed first
- Stuck Job Detection: Automatic recovery of hung jobs
- Better Monitoring: Comprehensive statistics and job tracking
- Flexible Scheduling: Support for delayed jobs
- Database Agnostic: Works with any database (not just WordPress)
- Idempotent Enqueuing:
enqueue_unique()prevents duplicate jobs byreference_id(v1.8.0)
Queue Architecture
Job Flow
┌──────────────┐
│ Enqueue Job │
└──────┬───────┘
│
▼
┌──────────────────┐
│ rm_queue Table │ ← Job stored with priority
└──────┬───────────┘
│
▼
┌───────────────────────┐
│ Queue Processor │ ← Runs every 5 minutes
│ (Cron) │
└──────┬────────────────┘
│
▼
┌──────────────────────────┐
│ Dequeue & Process │ ← Highest priority first
│ - Inventory Sync │
│ - Webhook Processing │
│ - Other Jobs │
└──────┬───────────────────┘
│
├─Success─────► Mark Complete
│
└─Failure─────► Retry or Mark Failed
Job Types
Inventory Sync
- Type:
inventory_sync - Priority: 5 (normal)
- Payload:
{site_id: 5} - Max Attempts: 3
- Typical Duration: 5-20 minutes
Product Webhook
- Type:
product_webhook - Priority: 8 (high)
- Payload:
{webhook_log_id: 123, site_id: 5, event_type: "store/product/created", payload: {...}} - Max Attempts: 3
- Typical Duration: < 1 second
PayArc Dispute
- Type:
payarc_dispute - Priority: 7 (high-normal)
- Payload:
{webhook_id: 42} - Reference ID:
payarc_case_{case_id}(idempotency key) - Max Attempts: 3
- Typical Duration: 1-3 seconds
Idempotent Enqueuing
The enqueue_unique() method prevents duplicate jobs for the same business entity by enforcing a reference_id uniqueness check before inserting.
When the same event can trigger multiple webhook deliveries (e.g. PayArc retries a dispute notification), enqueue_unique() guarantees that only one active job exists for a given reference_id at any time.
How It Works
- Caller passes a
reference_idstring — the unique key for the business entity (e.g."payarc_case_12345"). - The method queries
rm_queuefor any existing row with thatreference_idwherestatus IN ('pending', 'processing'). - If found: returns
null— the duplicate is silently acknowledged with no insert. - If not found: inserts normally and returns the new job ID.
- On DB error: returns
false.
Return Values
| Return | Meaning |
|---|---|
int | New job ID — successfully queued |
null | Already queued — idempotent no-op |
false | Database error |
Reference ID Conventions
| Job Type | Reference ID Format | Example |
|---|---|---|
payarc_dispute | payarc_case_{case_id} | payarc_case_CASE-12345 |
| Custom | Any unique string per entity | inv_sync_site_5_2026-03-26 |
Use enqueue_unique() instead of enqueue() any time a webhook receiver or event handler can fire more than once for the same underlying job.
Database Column
Migration 007_add_reference_id_to_queue.php (v1.8.0) adds:
ALTER TABLE `rm_queue`
ADD COLUMN `reference_id` VARCHAR(255) NULL
COMMENT 'Optional idempotency key — unique per active job'
AFTER `site_id`,
ADD UNIQUE KEY `idx_reference_id` (`reference_id`);
The column is NULL-able — jobs created with enqueue() leave it empty.
Queue States
Pending
Job is waiting to be processed.
SELECT * FROM rm_queue
WHERE status = 'pending'
ORDER BY priority DESC, created_at ASC;
Processing
Job is currently being executed.
SELECT * FROM rm_queue
WHERE status = 'processing';
Completed
Job finished successfully.
SELECT * FROM rm_queue
WHERE status = 'completed'
ORDER BY completed_at DESC
LIMIT 100;
Failed
Job failed after max retries.
SELECT * FROM rm_queue
WHERE status = 'failed'
ORDER BY failed_at DESC;
Priority System
Jobs are processed by priority (1-10), with higher numbers processed first.
| Priority | Description | Job Types |
|---|---|---|
| 10 | Critical | Emergency operations |
| 8-9 | High | Webhooks, real-time updates |
| 5-7 | Normal | Scheduled syncs |
| 1-4 | Low | Cleanup, maintenance |
Example Priority Usage
// High priority - webhook
$queue->enqueue('product_webhook', $payload, 8);
// Normal priority - scheduled sync
$queue->enqueue('inventory_sync', $payload, 5);
// Low priority - log cleanup
$queue->enqueue('cleanup', $payload, 2);
Retry Logic
Exponential Backoff
Failed jobs are automatically retried with increasing delays:
| Attempt | Delay | Status |
|---|---|---|
| 1 | 0 min | Immediate |
| 2 | 2 min | Retry |
| 3 | 4 min | Retry |
| 4 | 8 min | Final attempt |
| 5+ | - | Failed |
Scheduled Retry
When a job fails, it's rescheduled with a scheduled_at timestamp:
SELECT
id,
job_type,
attempts,
scheduled_at,
error_message
FROM rm_queue
WHERE status = 'pending'
AND scheduled_at > NOW()
ORDER BY scheduled_at;
Monitoring
Queue Statistics
Via REST API
curl -X GET \
'https://your-site.com/wp-json/gsm-middleware/v1/inventory/stats'
Response:
{
"queue": {
"total": 150,
"pending": 5,
"processing": 2,
"completed": 140,
"failed": 3
}
}
Via Database
SELECT
COUNT(*) as total,
SUM(CASE WHEN status = 'pending' THEN 1 ELSE 0 END) as pending,
SUM(CASE WHEN status = 'processing' THEN 1 ELSE 0 END) as processing,
SUM(CASE WHEN status = 'completed' THEN 1 ELSE 0 END) as completed,
SUM(CASE WHEN status = 'failed' THEN 1 ELSE 0 END) as failed
FROM rm_queue
WHERE created_at > DATE_SUB(NOW(), INTERVAL 24 HOUR);
Jobs by Type
SELECT
job_type,
COUNT(*) as total,
AVG(TIMESTAMPDIFF(SECOND, started_at, completed_at)) as avg_duration_sec
FROM rm_queue
WHERE status = 'completed'
AND completed_at > DATE_SUB(NOW(), INTERVAL 7 DAY)
GROUP BY job_type;
Recent Failures
SELECT
id,
job_type,
error_message,
attempts,
failed_at
FROM rm_queue
WHERE status = 'failed'
ORDER BY failed_at DESC
LIMIT 20;
Stuck Job Detection
Jobs stuck in "processing" state for > 30 minutes are automatically released by hourly maintenance.
Manual Release
UPDATE rm_queue
SET status = 'pending',
error_message = 'Released due to timeout'
WHERE status = 'processing'
AND started_at < DATE_SUB(NOW(), INTERVAL 30 MINUTE);
Via WP-CLI
wp eval "
\$queue = new \GSM\Middleware\Queue\Queue_System(\$wpdb);
echo \$queue->release_stuck_jobs(30) . ' jobs released';
"
Maintenance
Automatic Cleanup
The queue system automatically cleans up old completed jobs:
- Frequency: Hourly
- Retention: 7 days (completed jobs)
- Action:
gsm_queue_maintenancecron job
Manual Cleanup
-- Delete completed jobs older than 7 days
DELETE FROM rm_queue
WHERE status = 'completed'
AND completed_at < DATE_SUB(NOW(), INTERVAL 7 DAY);
Via WP-CLI
wp eval "
\$queue = new \GSM\Middleware\Queue\Queue_System(\$wpdb);
echo \$queue->cleanup_old_jobs(7) . ' jobs deleted';
"
Manual Job Management
Retry Failed Job
UPDATE rm_queue
SET status = 'pending',
scheduled_at = NULL,
error_message = NULL
WHERE id = 123;
Cancel Job
UPDATE rm_queue
SET status = 'failed',
error_message = 'Manually cancelled'
WHERE id = 123;
Inspect Job Payload
SELECT
id,
job_type,
payload,
created_at
FROM rm_queue
WHERE id = 123;
The payload is stored as JSON:
{
"site_id": 5,
"webhook_log_id": 456
}
Troubleshooting
Queue Not Processing
-
Check cron is running
wp cron event list | grep gsm_process_queue -
Verify cron schedule
- Should run every 5 minutes
- Check
wp cron event scheduleoutput
-
Test manual processing
wp cron event run gsm_process_queue -
Check for errors
tail -f /var/log/wordpress/debug.log | grep "GSM Queue"
Jobs Failing Repeatedly
-
Check error messages
SELECT
id,
job_type,
error_message,
attempts,
payload
FROM rm_queue
WHERE status = 'failed'
ORDER BY failed_at DESC
LIMIT 10; -
Common issues
- API credentials expired
- Network connectivity
- Database connection issues
- Memory limits
-
Fix and retry
- Resolve the underlying issue
- Reset job to pending:
UPDATE rm_queue
SET status = 'pending',
attempts = 0,
error_message = NULL
WHERE id = 123;
High Queue Backlog
-
Check processing rate
SELECT
DATE(created_at) as date,
COUNT(*) as jobs_created,
SUM(CASE WHEN status = 'completed' THEN 1 ELSE 0 END) as jobs_completed
FROM rm_queue
WHERE created_at > DATE_SUB(NOW(), INTERVAL 7 DAY)
GROUP BY DATE(created_at); -
Increase processing frequency
- Queue runs every 5 minutes by default
- Can be increased to every 2-3 minutes if needed
-
Process more jobs per run
- Default: 10 jobs per run
- Can be increased in code if needed
Memory Issues
If jobs are failing with memory errors:
-
Check memory limit
php -i | grep memory_limit -
Increase PHP memory limit
// In wp-config.php
define('WP_MEMORY_LIMIT', '512M');
define('WP_MAX_MEMORY_LIMIT', '512M'); -
Process fewer items per job
- Reduce batch size in inventory sync
- Split large operations into smaller jobs
Performance Tuning
Optimize Query Performance
Ensure indexes are created:
-- Check indexes
SHOW INDEX FROM rm_queue;
-- Should have indexes on:
-- - status, priority, created_at (composite)
-- - job_type
-- - site_id
-- - scheduled_at
Monitor Query Performance
EXPLAIN SELECT *
FROM rm_queue
WHERE status = 'pending'
AND (scheduled_at IS NULL OR scheduled_at <= NOW())
ORDER BY priority DESC, created_at ASC
LIMIT 10;
Database Maintenance
-- Optimize table
OPTIMIZE TABLE rm_queue;
-- Analyze table
ANALYZE TABLE rm_queue;
Best Practices
-
Keep Payloads Small
- Store references, not full data
- Link to webhook logs instead of duplicating payloads
-
Set Appropriate Priorities
- Use high priority for time-sensitive jobs
- Use normal priority for scheduled operations
- Use low priority for cleanup tasks
-
Monitor Regularly
- Check queue stats daily
- Set up alerts for high failure rates
- Review stuck jobs weekly
-
Clean Up Regularly
- Automatic cleanup keeps last 7 days
- Manual cleanup for older jobs if needed
- Archive important job data before deletion
-
Handle Failures Gracefully
- Log detailed error messages
- Don't retry indefinitely
- Alert on repeated failures
API Reference
Enqueue Job
use GSM\Middleware\Queue\Queue_System;
global $wpdb;
$queue = new Queue_System($wpdb);
$job_id = $queue->enqueue(
'inventory_sync', // Job type
['site_id' => 5], // Payload
5, // Priority (1-10)
5 // Site ID (optional)
);
Enqueue Unique Job (Idempotent)
Use when the same event may be received multiple times and only one active job should exist:
$result = $queue->enqueue_unique(
'payarc_dispute', // Job type
['webhook_id' => 42], // Payload
'payarc_case_CASE-12345', // reference_id — idempotency key
7, // Priority (1-10)
null // Site ID (optional)
);
if ( null === $result ) {
// Already queued — safe to acknowledge and move on.
} elseif ( false === $result ) {
// Database error — log and handle.
} else {
// $result is the new job ID (int).
}
Process Queue
$processed = $queue->process(
function($job) {
// Handle job
$site_id = $job['payload']['site_id'];
// ... process job
},
'inventory_sync', // Job type filter (optional)
10 // Max jobs to process
);
Get Statistics
$stats = $queue->get_stats();
// Returns: ['total', 'pending', 'processing', 'completed', 'failed']
Cleanup Old Jobs
$deleted = $queue->cleanup_old_jobs(7); // Days
echo "$deleted jobs deleted";
Release Stuck Jobs
$released = $queue->release_stuck_jobs(30); // Minutes
echo "$released jobs released";
Related Documentation
- Webhooks - Webhook processing via queue
- Performance Tuning - Optimize queue performance
- Cron Jobs - Scheduled queue processing