Job queues are essential components in modern applications. Queues enable you to handle resource-intensive or time-consuming tasks asynchronously. This approach improves application responsiveness by moving heavy processing out of the request-response cycle.
In this guide, we'll walk through building a job queue system using Node.js, Bull (a Redis-backed queue library), and Neon Postgres to process jobs efficiently at scale.
Prerequisites
To follow the steps in this guide, you will need the following:
- Node.js 18 or later
- A Neon account
- Redis installed locally
- Basic understanding of JavaScript and PostgreSQL
Understanding Job Queues
Job queues allow applications to offload time-consuming tasks to be processed in the background. Some common use cases include:
- Email delivery
- Image or video processing
- Data aggregation and reporting
- Webhook delivery
- Regular maintenance tasks
Here's how our architecture will work:
- The main application adds jobs to the queue
- Bull manages the queue in Redis
- Worker processes consume jobs from the queue
- Job statuses and results are stored in Neon Postgres
- The application can check job status and retrieve results
This separation improves system performance, reliability, and scalability. It also allows for better error handling, retry logic, monitoring and even user experience as the application can respond quickly to user requests regardless of the job processing time.
Create a Neon project
First, let's set up a new Neon Postgres database to store our job metadata.
-
Navigate to the Neon Console and create a new project.
-
Choose a name for your project, for example "job-queue-system".
-
After creating the project, you'll see the connection details. Save the connection string, you'll need it to connect to your database.
-
Using the SQL Editor in the Neon Console or your preferred PostgreSQL client, create the tables for our job queue system:
CREATE TABLE jobs (
id SERIAL PRIMARY KEY,
job_id VARCHAR(255) UNIQUE NOT NULL,
queue_name VARCHAR(100) NOT NULL,
data JSONB,
status VARCHAR(50) NOT NULL,
result JSONB,
error TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
started_at TIMESTAMP,
completed_at TIMESTAMP,
attempts INT DEFAULT 0,
max_attempts INT DEFAULT 3
);
CREATE INDEX idx_jobs_status ON jobs(status);
CREATE INDEX idx_jobs_queue_name ON jobs(queue_name);
This jobs table will track:
- Unique job identifiers
- The queue a job belongs to
- Job data/parameters
- Current status (pending, active, completed, failed)
- Results or errors
- Timestamps for tracking job lifecycle
- Retry information
Set up your Node.js project
Now that our database is ready, let's create a Node.js project:
- Create a new directory for your project:
mkdir job-queue-system
cd job-queue-system
- Initialize a new Node.js project:
npm init -y
- Install the required dependencies:
npm install bull pg dotenv express
These packages provide:
bull
: Queue management with Redispg
: PostgreSQL client for Node.jsdotenv
: Environment variable managementexpress
: Web framework to create a simple API for our example
- Create a
.env
file to store your configuration:
# Database
DATABASE_URL=postgres://[user]:[password]@[hostname]/[database]?sslmode=require
# Redis
REDIS_URL=redis://localhost:6379
# Application
PORT=3000
Replace the DATABASE_URL
with your Neon connection string.
Integrate Bull for job processing
Bull is a Node.js library that implements a fast queue system based on Redis.
If you don't have Redis installed, you can run it using Docker:
docker run -d -p 6379:6379 redis
Let's set up the basic queue structure for our job queue system.
Create a file named src/queues/index.js
:
// src/queues/index.js
const Bull = require('bull');
const dotenv = require('dotenv');
dotenv.config();
// Create queues
const emailQueue = new Bull('email', process.env.REDIS_URL);
const imageProcessingQueue = new Bull('image-processing', process.env.REDIS_URL);
const dataExportQueue = new Bull('data-export', process.env.REDIS_URL);
// Export the queues to be used elsewhere in the application
module.exports = {
emailQueue,
imageProcessingQueue,
dataExportQueue,
};
What the queues represent in this context is a way to group similar jobs together. This file creates three different queues for different types of jobs. In a production application, you might have many more queues for various tasks.
Now, let's create a utility to add jobs to these queues. Create a file named src/utils/queueHelper.js
:
// src/utils/queueHelper.js
const { Pool } = require('pg');
const dotenv = require('dotenv');
dotenv.config();
// Initialize PostgreSQL connection pool
const pool = new Pool({
connectionString: process.env.DATABASE_URL,
ssl: {
rejectUnauthorized: false,
},
});
// Helper function to add a job to a queue and record it in Postgres
async function addJob(queue, data, options = {}) {
const client = await pool.connect();
try {
await client.query('BEGIN');
// Add job to Bull queue
const job = await queue.add(data, options);
// Record job in Postgres
const result = await client.query(
`INSERT INTO jobs (job_id, queue_name, data, status)
VALUES ($1, $2, $3, $4)
RETURNING id`,
[job.id.toString(), queue.name, JSON.stringify(data), 'pending']
);
await client.query('COMMIT');
return {
jobId: job.id,
dbId: result.rows[0].id,
};
} catch (error) {
await client.query('ROLLBACK');
throw error;
} finally {
client.release();
}
}
// Helper function to update job status in Postgres
async function updateJobStatus(jobId, queueName, updates) {
const client = await pool.connect();
try {
// Build the SET clause based on provided updates
const setClauses = [];
const values = [jobId, queueName];
let paramIndex = 3;
for (const [key, value] of Object.entries(updates)) {
setClauses.push(`${key} = $${paramIndex}`);
values.push(value);
paramIndex++;
}
if (setClauses.length === 0) {
return;
}
const setClause = setClauses.join(', ');
await client.query(
`UPDATE jobs
SET ${setClause}
WHERE job_id = $1 AND queue_name = $2`,
values
);
} finally {
client.release();
}
}
module.exports = {
addJob,
updateJobStatus,
};
This helper file provides two main functions:
addJob
: Adds a job to a Bull queue and records it in our Neon Postgres databaseupdateJobStatus
: Updates a job's status in the Neon Postgres as it progresses through the queue
Create the job processor
Now, let's create processors for each type of job. We'll create processors for email sending, image processing, and data exports. A processor is a function that takes a job object from Bull, processes it, and updates the job status in Postgres.
First, let's set up our directory structure:
mkdir -p src/processors
Create a file for email processing at src/processors/emailProcessor.js
:
// src/processors/emailProcessor.js
const { updateJobStatus } = require('../utils/queueHelper');
async function sendEmail(to, subject, body) {
// In a real application, you'd use a library like Nodemailer
// This is a simplified example
console.log(`Sending email to ${to}`);
console.log(`Subject: ${subject}`);
console.log(`Body: ${body}`);
// Simulate network delay
await new Promise((resolve) => setTimeout(resolve, 1000));
// Return success
return { delivered: true, timestamp: new Date() };
}
// The processor function takes a job object from Bull
async function processEmailJob(job) {
try {
// Update job status in Postgres
await updateJobStatus(job.id, 'email', {
status: 'active',
started_at: new Date().toISOString(),
});
// Extract job data
const { to, subject, body } = job.data;
// Process the job
const result = await sendEmail(to, subject, body);
// Update job status on success
await updateJobStatus(job.id, 'email', {
status: 'completed',
result: JSON.stringify(result),
completed_at: new Date().toISOString(),
});
// Return the result
return result;
} catch (error) {
// Update job status on failure
await updateJobStatus(job.id, 'email', {
status: 'failed',
error: error.message,
completed_at: new Date().toISOString(),
});
// Re-throw the error for Bull to handle
throw error;
}
}
module.exports = processEmailJob;
Similarly, create a processor for image processing at src/processors/imageProcessor.js
:
// src/processors/imageProcessor.js
const { updateJobStatus } = require('../utils/queueHelper');
async function processImage(imageUrl, options) {
// In a real application, you'd use libraries like Sharp
// This is a simplified example
console.log(`Processing image from ${imageUrl}`);
console.log('Options:', options);
// Simulate CPU-intensive task
await new Promise((resolve) => setTimeout(resolve, 3000));
// Return processed image details
return {
processed: true,
originalUrl: imageUrl,
processedUrl: `processed-${imageUrl}`,
dimensions: { width: 800, height: 600 },
timestamp: new Date(),
};
}
async function processImageJob(job) {
try {
// Update job status in Postgres
await updateJobStatus(job.id, 'image-processing', {
status: 'active',
started_at: new Date().toISOString(),
});
// Extract job data
const { imageUrl, options } = job.data;
// Process the job
const result = await processImage(imageUrl, options);
// Update job status on success
await updateJobStatus(job.id, 'image-processing', {
status: 'completed',
result: JSON.stringify(result),
completed_at: new Date().toISOString(),
});
// Return the result
return result;
} catch (error) {
// Update job status on failure
await updateJobStatus(job.id, 'image-processing', {
status: 'failed',
error: error.message,
completed_at: new Date().toISOString(),
});
// Re-throw the error for Bull to handle
throw error;
}
}
module.exports = processImageJob;
Now, let's create a processor for data exports at src/processors/dataExportProcessor.js
:
// src/processors/dataExportProcessor.js
const { updateJobStatus } = require('../utils/queueHelper');
const { Pool } = require('pg');
const dotenv = require('dotenv');
dotenv.config();
// Initialize PostgreSQL connection pool
const pool = new Pool({
connectionString: process.env.DATABASE_URL,
ssl: {
rejectUnauthorized: false,
},
});
async function exportData(query, format) {
console.log(`Executing query: ${query}`);
console.log(`Export format: ${format}`);
// Actually execute the query against Neon Postgres
const result = await pool.query(query);
// Simulate file creation
await new Promise((resolve) => setTimeout(resolve, 2000));
// Return export details
return {
records: result.rowCount,
format: format,
downloadUrl: `https://example.com/exports/${Date.now()}.${format}`,
timestamp: new Date(),
};
}
async function processDataExportJob(job) {
try {
// Update job status in Postgres
await updateJobStatus(job.id, 'data-export', {
status: 'active',
started_at: new Date().toISOString(),
});
// Extract job data
const { query, format } = job.data;
// Process the job
const result = await exportData(query, format);
// Update job status on success
await updateJobStatus(job.id, 'data-export', {
status: 'completed',
result: JSON.stringify(result),
completed_at: new Date().toISOString(),
});
// Return the result
return result;
} catch (error) {
// Update job status on failure
await updateJobStatus(job.id, 'data-export', {
status: 'failed',
error: error.message,
completed_at: new Date().toISOString(),
});
// Re-throw the error for Bull to handle
throw error;
}
}
module.exports = processDataExportJob;
Each processor function follows a similar pattern:
- Update the job status to
active
when processing starts - Extract job data from the Bull job object
- Process the job
- Update the job status to
completed
on success orfailed
on error - Return the result
The core logic of each processor is kept separate from the job queue management, which allows for easier testing, maintenance, and reuse.
Implement PostgreSQL job tracking
With the processors in place, let's create a module to retrieve job information from PostgreSQL. Create a file named src/utils/jobsRepository.js
:
// src/utils/jobsRepository.js
const { Pool } = require('pg');
const dotenv = require('dotenv');
dotenv.config();
// Initialize PostgreSQL connection pool
const pool = new Pool({
connectionString: process.env.DATABASE_URL,
ssl: {
rejectUnauthorized: false,
},
});
async function getJobById(jobId) {
const result = await pool.query('SELECT * FROM jobs WHERE job_id = $1', [jobId]);
return result.rows[0] || null;
}
async function getJobsByStatus(status, limit = 100, offset = 0) {
const result = await pool.query(
'SELECT * FROM jobs WHERE status = $1 ORDER BY created_at DESC LIMIT $2 OFFSET $3',
[status, limit, offset]
);
return result.rows;
}
async function getJobsByQueue(queueName, limit = 100, offset = 0) {
const result = await pool.query(
'SELECT * FROM jobs WHERE queue_name = $1 ORDER BY created_at DESC LIMIT $2 OFFSET $3',
[queueName, limit, offset]
);
return result.rows;
}
async function getJobStats() {
const result = await pool.query(`
SELECT
queue_name,
status,
COUNT(*) as count
FROM jobs
GROUP BY queue_name, status
ORDER BY queue_name, status
`);
return result.rows;
}
module.exports = {
getJobById,
getJobsByStatus,
getJobsByQueue,
getJobStats,
};
This module provides several functions to query job information from our Postgres database:
getJobById
: Retrieve a specific job by its IDgetJobsByStatus
: Get jobs filtered by their statusgetJobsByQueue
: Get jobs from a specific queuegetJobStats
: Get statistics about jobs across all queues
These functions will be used by the API to provide job status and statistics to the user.
Build retry and error handling
Bull provides built-in retry capabilities. Let's set up our worker with proper retry and error handling. Create a file named src/worker.js
:
// src/worker.js
const { emailQueue, imageProcessingQueue, dataExportQueue } = require('./queues');
const processEmailJob = require('./processors/emailProcessor');
const processImageJob = require('./processors/imageProcessor');
const processDataExportJob = require('./processors/dataExportProcessor');
const { updateJobStatus } = require('./utils/queueHelper');
// Set up processors with retry logic
emailQueue.process(processEmailJob);
imageProcessingQueue.process(processImageJob);
dataExportQueue.process(processDataExportJob);
// Global error handlers for each queue
const setupErrorHandlers = (queue) => {
queue.on('failed', async (job, err) => {
console.error(`Job ${job.id} in ${queue.name} queue failed:`, err.message);
// Update job status and increment attempt count
await updateJobStatus(job.id, queue.name, {
status: job.attemptsMade >= job.opts.attempts ? 'failed' : 'waiting',
attempts: job.attemptsMade,
error: err.message,
});
});
queue.on('completed', async (job, result) => {
console.log(`Job ${job.id} in ${queue.name} queue completed`);
});
queue.on('stalled', async (job) => {
console.warn(`Job ${job.id} in ${queue.name} queue has stalled`);
// Update job status
await updateJobStatus(job.id, queue.name, {
status: 'stalled',
});
});
};
// Set up error handlers for all queues
setupErrorHandlers(emailQueue);
setupErrorHandlers(imageProcessingQueue);
setupErrorHandlers(dataExportQueue);
console.log('Worker started processing jobs...');
This worker file:
- Imports all our queues and job processors
- Assigns each processor to its respective queue
- Sets up error handlers for failed, completed, and stalled jobs
- Updates the job status in Postgres based on these events
Set up a simple API
Let's create a simple Express API to add jobs to the queue and check their status. Create a file named src/api.js
:
// src/api.js
const express = require('express');
const { emailQueue, imageProcessingQueue, dataExportQueue } = require('./queues');
const { addJob } = require('./utils/queueHelper');
const {
getJobById,
getJobsByStatus,
getJobsByQueue,
getJobStats,
} = require('./utils/jobsRepository');
const app = express();
app.use(express.json());
const port = process.env.PORT || 3000;
// Add email job
app.post('/jobs/email', async (req, res) => {
try {
const { to, subject, body } = req.body;
// Validate input
if (!to || !subject || !body) {
return res.status(400).json({ error: 'Missing required fields' });
}
// Add job to queue with 3 retry attempts
const job = await addJob(
emailQueue,
{ to, subject, body },
{
attempts: 3,
backoff: {
type: 'exponential',
delay: 1000,
},
}
);
res.status(201).json({
message: 'Email job added to queue',
jobId: job.jobId,
});
} catch (error) {
console.error('Error adding email job:', error);
res.status(500).json({ error: 'Failed to add job to queue' });
}
});
// Add image processing job
app.post('/jobs/image', async (req, res) => {
try {
const { imageUrl, options } = req.body;
// Validate input
if (!imageUrl) {
return res.status(400).json({ error: 'Missing required fields' });
}
// Add job to queue with 2 retry attempts
const job = await addJob(
imageProcessingQueue,
{ imageUrl, options },
{
attempts: 2,
backoff: {
type: 'fixed',
delay: 5000,
},
}
);
res.status(201).json({
message: 'Image processing job added to queue',
jobId: job.jobId,
});
} catch (error) {
console.error('Error adding image processing job:', error);
res.status(500).json({ error: 'Failed to add job to queue' });
}
});
// Add data export job
app.post('/jobs/export', async (req, res) => {
try {
const { query, format } = req.body;
// Validate input
if (!query || !format) {
return res.status(400).json({ error: 'Missing required fields' });
}
// Add job to queue with 3 retry attempts
const job = await addJob(
dataExportQueue,
{ query, format },
{
attempts: 3,
backoff: {
type: 'exponential',
delay: 2000,
},
}
);
res.status(201).json({
message: 'Data export job added to queue',
jobId: job.jobId,
});
} catch (error) {
console.error('Error adding data export job:', error);
res.status(500).json({ error: 'Failed to add job to queue' });
}
});
// Get job status
app.get('/jobs/:id', async (req, res) => {
try {
const job = await getJobById(req.params.id);
if (!job) {
return res.status(404).json({ error: 'Job not found' });
}
res.json(job);
} catch (error) {
console.error('Error fetching job:', error);
res.status(500).json({ error: 'Failed to fetch job' });
}
});
// Get jobs by status
app.get('/jobs/status/:status', async (req, res) => {
try {
const limit = parseInt(req.query.limit) || 100;
const offset = parseInt(req.query.offset) || 0;
const jobs = await getJobsByStatus(req.params.status, limit, offset);
res.json(jobs);
} catch (error) {
console.error('Error fetching jobs by status:', error);
res.status(500).json({ error: 'Failed to fetch jobs' });
}
});
// Get jobs by queue
app.get('/jobs/queue/:queue', async (req, res) => {
try {
const limit = parseInt(req.query.limit) || 100;
const offset = parseInt(req.query.offset) || 0;
const jobs = await getJobsByQueue(req.params.queue, limit, offset);
res.json(jobs);
} catch (error) {
console.error('Error fetching jobs by queue:', error);
res.status(500).json({ error: 'Failed to fetch jobs' });
}
});
// Get job stats
app.get('/stats', async (req, res) => {
try {
const stats = await getJobStats();
res.json(stats);
} catch (error) {
console.error('Error fetching job stats:', error);
res.status(500).json({ error: 'Failed to fetch job stats' });
}
});
app.listen(port, () => {
console.log(`API server listening on port ${port}`);
});
This API provides endpoints to:
- Add jobs to the different queues
- Check the status of a specific job
- List jobs by status or queue
- Get job statistics
We will later add a Bull Board dashboard to monitor the job queues in real-time and use these endpoints to test our job queue system.
Create the main entry point
With all the above in place, let's create the main entry point for our application. Create a file named src/index.js
:
// src/index.js
const dotenv = require('dotenv');
dotenv.config();
// In a production environment, you would typically run the API and worker in separate processes
// For simplicity, we're starting both in the same file
const startMode = process.env.START_MODE || 'all';
if (startMode === 'all' || startMode === 'api') {
require('./api');
console.log('API server started');
}
if (startMode === 'all' || startMode === 'worker') {
require('./worker');
console.log('Worker process started');
}
Update your package.json
to include start scripts:
{
"scripts": {
"start": "node src/index.js",
"start:api": "START_MODE=api node src/index.js",
"start:worker": "START_MODE=worker node src/index.js"
}
}
Run the application
Now you can run the application:
npm start
If you were to navigate to http://localhost:3000
, you would see the API server running.
This will start both the API server and the worker process. In a production environment, you might want to run them separately:
# Start the API server
npm run start:api
# Start the worker in a different terminal
npm run start:worker
Monitor your job queue
To monitor your job queue in real-time, you can use Bull UI tools like Bull Board.
Install Bull Board:
npm install @bull-board/express @bull-board/api
Then add the following to your src/api.js
file:
// Add at the top of the file
const { createBullBoard } = require('@bull-board/api');
const { BullAdapter } = require('@bull-board/api/bullAdapter');
const { ExpressAdapter } = require('@bull-board/express');
// Add before app.use(express.json())
const serverAdapter = new ExpressAdapter();
serverAdapter.setBasePath('/admin/queues');
createBullBoard({
queues: [
new BullAdapter(emailQueue),
new BullAdapter(imageProcessingQueue),
new BullAdapter(dataExportQueue),
],
serverAdapter,
});
app.use('/admin/queues', serverAdapter.getRouter());
Now you can access the Bull Board dashboard at http://localhost:3000/admin/queues
.
Testing Your Job Queue System
Now that you've set up your job queue system, let's test it to ensure everything works correctly. We'll create several test jobs, monitor their progress, and examine the results.
First, make sure your system is running with both the API server and worker process:
npm start
Using a tool like cURL or Postman, you can send requests to your API to create new jobs:
1. Create an Email Job
curl -X POST http://localhost:3000/jobs/email \
-H "Content-Type: application/json" \
-d '{
"to": "test@example.com",
"subject": "Test Email",
"body": "This is a test email from our job queue system."
}'
You should receive a response like:
{
"message": "Email job added to queue",
"jobId": "1"
}
2. Create an Image Processing Job
curl -X POST http://localhost:3000/jobs/image \
-H "Content-Type: application/json" \
-d '{
"imageUrl": "https://example.com/sample-image.jpg",
"options": {
"resize": { "width": 800, "height": 600 },
"format": "webp"
}
}'
3. Create a Data Export Job
curl -X POST http://localhost:3000/jobs/export \
-H "Content-Type: application/json" \
-d '{
"query": "SELECT * FROM jobs LIMIT 10",
"format": "csv"
}'
Monitor Job Progress
After creating the jobs, you can monitor their progress in several ways:
Check Job Status via API
Use the job ID returned when you created the job to check its status:
curl http://localhost:3000/jobs/1 | jq
You should see the job details, including its current status (pending, active, completed, or failed):
{
"id": 1,
"job_id": "1",
"queue_name": "email",
"data": {
"to": "test@example.com",
"subject": "Test Email",
"body": "This is a test email from our job queue system."
},
"status": "completed",
"result": { "delivered": true, "timestamp": "2025-03-16T11:32:47.123Z" },
"error": null,
"created_at": "2025-03-16T11:32:45.678Z",
"started_at": "2025-03-16T11:32:46.789Z",
"completed_at": "2025-03-16T11:32:47.890Z",
"attempts": 1,
"max_attempts": 3
}
View Jobs by Status
To see all jobs with a specific status:
curl http://localhost:3000/jobs/status/completed
This will return a list of all completed jobs. You can also check for "pending", "active", or "failed" jobs.
Check Queue Statistics
To see statistics about all your queues:
curl http://localhost:3000/stats | jq
This will show you a breakdown of job counts by queue and status:
[
{ "queue_name": "email", "status": "completed", "count": "1" },
{ "queue_name": "image-processing", "status": "active", "count": "1" },
{ "queue_name": "data-export", "status": "pending", "count": "1" }
]
Bull Board Dashboard
If you've set up Bull Board as described earlier, you can visit http://localhost:3000/admin/queues
in your browser to see a visual dashboard of all your queues and jobs.
This dashboard provides a real-time view of your queues, including active, waiting, and completed jobs.
Verifying Database Records
To check that the job information is being correctly stored in your Neon Postgres database, you can use the Neon SQL Editor or any PostgreSQL client to run queries:
SELECT * FROM jobs;
You can also check processing times for completed jobs:
SELECT
job_id,
queue_name,
status,
EXTRACT(EPOCH FROM (completed_at - started_at)) AS processing_time_seconds
FROM jobs
WHERE status = 'completed';
This query will show you the processing time for each completed job in seconds.
Conclusion
In this guide, you've built a job queue system using Node.js, Bull, and Neon Postgres. This system can handle different types of background tasks, retry failed jobs, and track job status and results in a PostgreSQL database.
The combination of Bull's queue management backed by Redis and Neon's serverless Postgres for persistent job tracking provides a scalable and reliable solution for background processing. It is a great foundation for building more complex job processing systems in your applications.
You can extend this system by adding more specialized queues, extending the monitoring, implementing user notifications, or integrating with other services in your architecture.
Additional resources
Need help?
Join our Discord Server to ask questions or see what others are doing with Neon. Users on paid plans can open a support ticket from the console. For more details, see Getting Support.