Skip to main content

Queue Management

Understanding and managing the custom queue system for reliable background job processing.

Overview

GSM Middleware uses a custom queue system for processing background jobs. This system is more reliable than WordPress Action Scheduler and provides better control over job execution, retries, and monitoring.

Why a Custom Queue?

Advantages Over Action Scheduler

  • Better Reliability: Custom retry logic with exponential backoff
  • Priority Support: High-priority jobs processed first
  • Stuck Job Detection: Automatic recovery of hung jobs
  • Better Monitoring: Comprehensive statistics and job tracking
  • Flexible Scheduling: Support for delayed jobs
  • Database Agnostic: Works with any database (not just WordPress)
  • Idempotent Enqueuing: enqueue_unique() prevents duplicate jobs by reference_id (v1.8.0)

Queue Architecture

Job Flow

┌──────────────┐
│ Enqueue Job │
└──────┬───────┘


┌──────────────────┐
│ rm_queue Table │ ← Job stored with priority
└──────┬───────────┘


┌───────────────────────┐
│ Queue Processor │ ← Runs every 5 minutes
│ (Cron) │
└──────┬────────────────┘


┌──────────────────────────┐
│ Dequeue & Process │ ← Highest priority first
│ - Inventory Sync │
│ - Webhook Processing │
│ - Other Jobs │
└──────┬───────────────────┘

├─Success─────► Mark Complete

└─Failure─────► Retry or Mark Failed

Job Types

Inventory Sync

  • Type: inventory_sync
  • Priority: 5 (normal)
  • Payload: {site_id: 5}
  • Max Attempts: 3
  • Typical Duration: 5-20 minutes

Product Webhook

  • Type: product_webhook
  • Priority: 8 (high)
  • Payload: {webhook_log_id: 123, site_id: 5, event_type: "store/product/created", payload: {...}}
  • Max Attempts: 3
  • Typical Duration: < 1 second

PayArc Dispute

  • Type: payarc_dispute
  • Priority: 7 (high-normal)
  • Payload: {webhook_id: 42}
  • Reference ID: payarc_case_{case_id} (idempotency key)
  • Max Attempts: 3
  • Typical Duration: 1-3 seconds

Idempotent Enqueuing

Added in v1.8.0

The enqueue_unique() method prevents duplicate jobs for the same business entity by enforcing a reference_id uniqueness check before inserting.

When the same event can trigger multiple webhook deliveries (e.g. PayArc retries a dispute notification), enqueue_unique() guarantees that only one active job exists for a given reference_id at any time.

How It Works

  1. Caller passes a reference_id string — the unique key for the business entity (e.g. "payarc_case_12345").
  2. The method queries rm_queue for any existing row with that reference_id where status IN ('pending', 'processing').
  3. If found: returns null — the duplicate is silently acknowledged with no insert.
  4. If not found: inserts normally and returns the new job ID.
  5. On DB error: returns false.

Return Values

ReturnMeaning
intNew job ID — successfully queued
nullAlready queued — idempotent no-op
falseDatabase error

Reference ID Conventions

Job TypeReference ID FormatExample
payarc_disputepayarc_case_{case_id}payarc_case_CASE-12345
CustomAny unique string per entityinv_sync_site_5_2026-03-26
tip

Use enqueue_unique() instead of enqueue() any time a webhook receiver or event handler can fire more than once for the same underlying job.

Database Column

Migration 007_add_reference_id_to_queue.php (v1.8.0) adds:

ALTER TABLE `rm_queue`
ADD COLUMN `reference_id` VARCHAR(255) NULL
COMMENT 'Optional idempotency key — unique per active job'
AFTER `site_id`,
ADD UNIQUE KEY `idx_reference_id` (`reference_id`);

The column is NULL-able — jobs created with enqueue() leave it empty.

Queue States

Pending

Job is waiting to be processed.

SELECT * FROM rm_queue 
WHERE status = 'pending'
ORDER BY priority DESC, created_at ASC;

Processing

Job is currently being executed.

SELECT * FROM rm_queue 
WHERE status = 'processing';

Completed

Job finished successfully.

SELECT * FROM rm_queue 
WHERE status = 'completed'
ORDER BY completed_at DESC
LIMIT 100;

Failed

Job failed after max retries.

SELECT * FROM rm_queue 
WHERE status = 'failed'
ORDER BY failed_at DESC;

Priority System

Jobs are processed by priority (1-10), with higher numbers processed first.

PriorityDescriptionJob Types
10CriticalEmergency operations
8-9HighWebhooks, real-time updates
5-7NormalScheduled syncs
1-4LowCleanup, maintenance

Example Priority Usage

// High priority - webhook
$queue->enqueue('product_webhook', $payload, 8);

// Normal priority - scheduled sync
$queue->enqueue('inventory_sync', $payload, 5);

// Low priority - log cleanup
$queue->enqueue('cleanup', $payload, 2);

Retry Logic

Exponential Backoff

Failed jobs are automatically retried with increasing delays:

AttemptDelayStatus
10 minImmediate
22 minRetry
34 minRetry
48 minFinal attempt
5+-Failed

Scheduled Retry

When a job fails, it's rescheduled with a scheduled_at timestamp:

SELECT 
id,
job_type,
attempts,
scheduled_at,
error_message
FROM rm_queue
WHERE status = 'pending'
AND scheduled_at > NOW()
ORDER BY scheduled_at;

Monitoring

Queue Statistics

Via REST API

curl -X GET \
'https://your-site.com/wp-json/gsm-middleware/v1/inventory/stats'

Response:

{
"queue": {
"total": 150,
"pending": 5,
"processing": 2,
"completed": 140,
"failed": 3
}
}

Via Database

SELECT 
COUNT(*) as total,
SUM(CASE WHEN status = 'pending' THEN 1 ELSE 0 END) as pending,
SUM(CASE WHEN status = 'processing' THEN 1 ELSE 0 END) as processing,
SUM(CASE WHEN status = 'completed' THEN 1 ELSE 0 END) as completed,
SUM(CASE WHEN status = 'failed' THEN 1 ELSE 0 END) as failed
FROM rm_queue
WHERE created_at > DATE_SUB(NOW(), INTERVAL 24 HOUR);

Jobs by Type

SELECT 
job_type,
COUNT(*) as total,
AVG(TIMESTAMPDIFF(SECOND, started_at, completed_at)) as avg_duration_sec
FROM rm_queue
WHERE status = 'completed'
AND completed_at > DATE_SUB(NOW(), INTERVAL 7 DAY)
GROUP BY job_type;

Recent Failures

SELECT 
id,
job_type,
error_message,
attempts,
failed_at
FROM rm_queue
WHERE status = 'failed'
ORDER BY failed_at DESC
LIMIT 20;

Stuck Job Detection

Jobs stuck in "processing" state for > 30 minutes are automatically released by hourly maintenance.

Manual Release

UPDATE rm_queue
SET status = 'pending',
error_message = 'Released due to timeout'
WHERE status = 'processing'
AND started_at < DATE_SUB(NOW(), INTERVAL 30 MINUTE);

Via WP-CLI

wp eval "
\$queue = new \GSM\Middleware\Queue\Queue_System(\$wpdb);
echo \$queue->release_stuck_jobs(30) . ' jobs released';
"

Maintenance

Automatic Cleanup

The queue system automatically cleans up old completed jobs:

  • Frequency: Hourly
  • Retention: 7 days (completed jobs)
  • Action: gsm_queue_maintenance cron job

Manual Cleanup

-- Delete completed jobs older than 7 days
DELETE FROM rm_queue
WHERE status = 'completed'
AND completed_at < DATE_SUB(NOW(), INTERVAL 7 DAY);

Via WP-CLI

wp eval "
\$queue = new \GSM\Middleware\Queue\Queue_System(\$wpdb);
echo \$queue->cleanup_old_jobs(7) . ' jobs deleted';
"

Manual Job Management

Retry Failed Job

UPDATE rm_queue
SET status = 'pending',
scheduled_at = NULL,
error_message = NULL
WHERE id = 123;

Cancel Job

UPDATE rm_queue
SET status = 'failed',
error_message = 'Manually cancelled'
WHERE id = 123;

Inspect Job Payload

SELECT 
id,
job_type,
payload,
created_at
FROM rm_queue
WHERE id = 123;

The payload is stored as JSON:

{
"site_id": 5,
"webhook_log_id": 456
}

Troubleshooting

Queue Not Processing

  1. Check cron is running

    wp cron event list | grep gsm_process_queue
  2. Verify cron schedule

    • Should run every 5 minutes
    • Check wp cron event schedule output
  3. Test manual processing

    wp cron event run gsm_process_queue
  4. Check for errors

    tail -f /var/log/wordpress/debug.log | grep "GSM Queue"

Jobs Failing Repeatedly

  1. Check error messages

    SELECT 
    id,
    job_type,
    error_message,
    attempts,
    payload
    FROM rm_queue
    WHERE status = 'failed'
    ORDER BY failed_at DESC
    LIMIT 10;
  2. Common issues

    • API credentials expired
    • Network connectivity
    • Database connection issues
    • Memory limits
  3. Fix and retry

    • Resolve the underlying issue
    • Reset job to pending:
    UPDATE rm_queue 
    SET status = 'pending',
    attempts = 0,
    error_message = NULL
    WHERE id = 123;

High Queue Backlog

  1. Check processing rate

    SELECT 
    DATE(created_at) as date,
    COUNT(*) as jobs_created,
    SUM(CASE WHEN status = 'completed' THEN 1 ELSE 0 END) as jobs_completed
    FROM rm_queue
    WHERE created_at > DATE_SUB(NOW(), INTERVAL 7 DAY)
    GROUP BY DATE(created_at);
  2. Increase processing frequency

    • Queue runs every 5 minutes by default
    • Can be increased to every 2-3 minutes if needed
  3. Process more jobs per run

    • Default: 10 jobs per run
    • Can be increased in code if needed

Memory Issues

If jobs are failing with memory errors:

  1. Check memory limit

    php -i | grep memory_limit
  2. Increase PHP memory limit

    // In wp-config.php
    define('WP_MEMORY_LIMIT', '512M');
    define('WP_MAX_MEMORY_LIMIT', '512M');
  3. Process fewer items per job

    • Reduce batch size in inventory sync
    • Split large operations into smaller jobs

Performance Tuning

Optimize Query Performance

Ensure indexes are created:

-- Check indexes
SHOW INDEX FROM rm_queue;

-- Should have indexes on:
-- - status, priority, created_at (composite)
-- - job_type
-- - site_id
-- - scheduled_at

Monitor Query Performance

EXPLAIN SELECT * 
FROM rm_queue
WHERE status = 'pending'
AND (scheduled_at IS NULL OR scheduled_at <= NOW())
ORDER BY priority DESC, created_at ASC
LIMIT 10;

Database Maintenance

-- Optimize table
OPTIMIZE TABLE rm_queue;

-- Analyze table
ANALYZE TABLE rm_queue;

Best Practices

  1. Keep Payloads Small

    • Store references, not full data
    • Link to webhook logs instead of duplicating payloads
  2. Set Appropriate Priorities

    • Use high priority for time-sensitive jobs
    • Use normal priority for scheduled operations
    • Use low priority for cleanup tasks
  3. Monitor Regularly

    • Check queue stats daily
    • Set up alerts for high failure rates
    • Review stuck jobs weekly
  4. Clean Up Regularly

    • Automatic cleanup keeps last 7 days
    • Manual cleanup for older jobs if needed
    • Archive important job data before deletion
  5. Handle Failures Gracefully

    • Log detailed error messages
    • Don't retry indefinitely
    • Alert on repeated failures

API Reference

Enqueue Job

use GSM\Middleware\Queue\Queue_System;

global $wpdb;
$queue = new Queue_System($wpdb);

$job_id = $queue->enqueue(
'inventory_sync', // Job type
['site_id' => 5], // Payload
5, // Priority (1-10)
5 // Site ID (optional)
);

Enqueue Unique Job (Idempotent)

Use when the same event may be received multiple times and only one active job should exist:

$result = $queue->enqueue_unique(
'payarc_dispute', // Job type
['webhook_id' => 42], // Payload
'payarc_case_CASE-12345', // reference_id — idempotency key
7, // Priority (1-10)
null // Site ID (optional)
);

if ( null === $result ) {
// Already queued — safe to acknowledge and move on.
} elseif ( false === $result ) {
// Database error — log and handle.
} else {
// $result is the new job ID (int).
}

Process Queue

$processed = $queue->process(
function($job) {
// Handle job
$site_id = $job['payload']['site_id'];
// ... process job
},
'inventory_sync', // Job type filter (optional)
10 // Max jobs to process
);

Get Statistics

$stats = $queue->get_stats();
// Returns: ['total', 'pending', 'processing', 'completed', 'failed']

Cleanup Old Jobs

$deleted = $queue->cleanup_old_jobs(7); // Days
echo "$deleted jobs deleted";

Release Stuck Jobs

$released = $queue->release_stuck_jobs(30); // Minutes
echo "$released jobs released";