fix(scan): prevent exponential slowdown from event loop blocking
- Reduce default CONCURRENT_API_CALLS from 10 to 5 (Sharp AVIF is CPU-intensive) - Create fresh p-limit instance per batch instead of module singleton - Add garbage collection hint between batches - Fix skippedCount tracking (was never incremented) - Increase batch delay from 100ms to 500ms for event loop drainage
This commit is contained in:
@@ -25,14 +25,15 @@ const USERNAME = process.env.API_USERNAME;
|
|||||||
const PASSWORD = process.env.API_PASSWORD;
|
const PASSWORD = process.env.API_PASSWORD;
|
||||||
const MIN_ACTIVITY_ID_SCAN = parseInt(process.env.MIN_ACTIVITY_ID_SCAN || '0', 10);
|
const MIN_ACTIVITY_ID_SCAN = parseInt(process.env.MIN_ACTIVITY_ID_SCAN || '0', 10);
|
||||||
const MAX_ACTIVITY_ID_SCAN = parseInt(process.env.MAX_ACTIVITY_ID_SCAN || '9999', 10);
|
const MAX_ACTIVITY_ID_SCAN = parseInt(process.env.MAX_ACTIVITY_ID_SCAN || '9999', 10);
|
||||||
const CONCURRENT_API_CALLS = parseInt(process.env.CONCURRENT_API_CALLS || '10', 10);
|
// Default 5 concurrent calls (was 10) - Sharp AVIF processing is CPU-intensive and can block event loop
|
||||||
|
const CONCURRENT_API_CALLS = parseInt(process.env.CONCURRENT_API_CALLS || '5', 10);
|
||||||
const CLUB_UPDATE_INTERVAL_MINS = parseInt(process.env.CLUB_UPDATE_INTERVAL_MINS || '60', 10);
|
const CLUB_UPDATE_INTERVAL_MINS = parseInt(process.env.CLUB_UPDATE_INTERVAL_MINS || '60', 10);
|
||||||
const STAFF_UPDATE_INTERVAL_MINS = parseInt(process.env.STAFF_UPDATE_INTERVAL_MINS || '60', 10);
|
const STAFF_UPDATE_INTERVAL_MINS = parseInt(process.env.STAFF_UPDATE_INTERVAL_MINS || '60', 10);
|
||||||
const FIXED_STAFF_ACTIVITY_ID = process.env.FIXED_STAFF_ACTIVITY_ID;
|
const FIXED_STAFF_ACTIVITY_ID = process.env.FIXED_STAFF_ACTIVITY_ID;
|
||||||
const S3_IMAGE_PREFIX = (process.env.S3_PUBLIC_URL_PREFIX || 'files').replace(/\/$/, '');
|
const S3_IMAGE_PREFIX = (process.env.S3_PUBLIC_URL_PREFIX || 'files').replace(/\/$/, '');
|
||||||
|
|
||||||
// Limit concurrent API calls
|
// Module-level counter for skipped activities (reset at start of each scan)
|
||||||
const limit = pLimit(CONCURRENT_API_CALLS);
|
let skippedCount = 0;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Process and cache a single activity
|
* Process and cache a single activity
|
||||||
@@ -105,8 +106,9 @@ async function processSingleActivity(activityId: string): Promise<void> {
|
|||||||
|
|
||||||
logger.debug(`Initializing cache for activity ID: ${activityId}`);
|
logger.debug(`Initializing cache for activity ID: ${activityId}`);
|
||||||
await processAndCacheActivity(activityId);
|
await processAndCacheActivity(activityId);
|
||||||
|
} else {
|
||||||
|
skippedCount++;
|
||||||
}
|
}
|
||||||
// else: skip (already cached)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@@ -121,11 +123,13 @@ export async function initializeClubCache(): Promise<void> {
|
|||||||
let processedCount = 0;
|
let processedCount = 0;
|
||||||
let successCount = 0;
|
let successCount = 0;
|
||||||
let errorCount = 0;
|
let errorCount = 0;
|
||||||
let skippedCount = 0;
|
skippedCount = 0; // Reset for this run
|
||||||
|
|
||||||
for (let batchStart = MIN_ACTIVITY_ID_SCAN; batchStart <= MAX_ACTIVITY_ID_SCAN; batchStart += BATCH_SIZE) {
|
for (let batchStart = MIN_ACTIVITY_ID_SCAN; batchStart <= MAX_ACTIVITY_ID_SCAN; batchStart += BATCH_SIZE) {
|
||||||
const batchEnd = Math.min(batchStart + BATCH_SIZE - 1, MAX_ACTIVITY_ID_SCAN);
|
const batchEnd = Math.min(batchStart + BATCH_SIZE - 1, MAX_ACTIVITY_ID_SCAN);
|
||||||
const batchPromises: Promise<void>[] = [];
|
const batchPromises: Promise<void>[] = [];
|
||||||
|
// Create fresh p-limit instance per batch to prevent internal queue accumulation
|
||||||
|
const limit = pLimit(CONCURRENT_API_CALLS);
|
||||||
|
|
||||||
logger.info(`Processing batch ${Math.floor(processedCount / BATCH_SIZE) + 1}/${Math.ceil(totalIds / BATCH_SIZE)} (IDs ${batchStart}-${batchEnd})`);
|
logger.info(`Processing batch ${Math.floor(processedCount / BATCH_SIZE) + 1}/${Math.ceil(totalIds / BATCH_SIZE)} (IDs ${batchStart}-${batchEnd})`);
|
||||||
|
|
||||||
@@ -158,8 +162,13 @@ export async function initializeClubCache(): Promise<void> {
|
|||||||
await Promise.allSettled(batchPromises);
|
await Promise.allSettled(batchPromises);
|
||||||
batchPromises.length = 0;
|
batchPromises.length = 0;
|
||||||
|
|
||||||
|
// Garbage collection hint and longer delay between batches to allow event loop to drain
|
||||||
|
if (global.gc) {
|
||||||
|
global.gc(false);
|
||||||
|
}
|
||||||
|
|
||||||
if (batchEnd < MAX_ACTIVITY_ID_SCAN) {
|
if (batchEnd < MAX_ACTIVITY_ID_SCAN) {
|
||||||
await new Promise(resolve => setTimeout(resolve, 100));
|
await new Promise(resolve => setTimeout(resolve, 500));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -174,6 +183,7 @@ export async function updateStaleClubs(): Promise<void> {
|
|||||||
logger.info('Starting stale club check...');
|
logger.info('Starting stale club check...');
|
||||||
const now = Date.now();
|
const now = Date.now();
|
||||||
const updateIntervalMs = CLUB_UPDATE_INTERVAL_MINS * 60 * 1000;
|
const updateIntervalMs = CLUB_UPDATE_INTERVAL_MINS * 60 * 1000;
|
||||||
|
const limit = pLimit(CONCURRENT_API_CALLS);
|
||||||
const promises: Promise<void>[] = [];
|
const promises: Promise<void>[] = [];
|
||||||
const activityKeys = await getAllActivityKeys();
|
const activityKeys = await getAllActivityKeys();
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user