fix(scan): remove p-limit
This commit is contained in:
@@ -48,7 +48,6 @@ services:
|
|||||||
redis:
|
redis:
|
||||||
image: "redis:8.0-alpine"
|
image: "redis:8.0-alpine"
|
||||||
container_name: dsas-cca-redis
|
container_name: dsas-cca-redis
|
||||||
# Add memory limits to prevent OOM
|
|
||||||
command: redis-server --requirepass "dsas-cca"
|
command: redis-server --requirepass "dsas-cca"
|
||||||
volumes:
|
volumes:
|
||||||
- ./redis_data:/data
|
- ./redis_data:/data
|
||||||
|
|||||||
32
index.ts
32
index.ts
@@ -191,9 +191,11 @@ app.get('/v1/activity/list', async (req: Request, res: Response) => {
|
|||||||
return res.json({});
|
return res.json({});
|
||||||
}
|
}
|
||||||
|
|
||||||
const allActivities = await Promise.all(
|
const allActivities: (ActivityData | null)[] = [];
|
||||||
activityKeys.map(async k => getActivityData(k.substring(ACTIVITY_KEY_PREFIX.length)))
|
for (const k of activityKeys) {
|
||||||
);
|
const activityData = await getActivityData(k.substring(ACTIVITY_KEY_PREFIX.length));
|
||||||
|
allActivities.push(activityData);
|
||||||
|
}
|
||||||
|
|
||||||
/* ---------- gather available filter values for validation ---------- */
|
/* ---------- gather available filter values for validation ---------- */
|
||||||
const availableCategories = new Set<string>();
|
const availableCategories = new Set<string>();
|
||||||
@@ -261,13 +263,13 @@ app.get('/v1/activity/category', async (_req: Request, res: Response) => {
|
|||||||
logger.info('No activity keys found in Redis for categories.');
|
logger.info('No activity keys found in Redis for categories.');
|
||||||
return res.json({});
|
return res.json({});
|
||||||
}
|
}
|
||||||
// Fetch all activity data in parallel
|
// Fetch all activity data sequentially
|
||||||
const allActivityDataPromises = activityKeys.map(async (key) => {
|
const allActivities: (ActivityData | null)[] = [];
|
||||||
|
for (const key of activityKeys) {
|
||||||
const activityId = key.substring(ACTIVITY_KEY_PREFIX.length);
|
const activityId = key.substring(ACTIVITY_KEY_PREFIX.length);
|
||||||
return getActivityData(activityId);
|
const activityData = await getActivityData(activityId);
|
||||||
});
|
allActivities.push(activityData);
|
||||||
|
}
|
||||||
const allActivities = await Promise.all(allActivityDataPromises);
|
|
||||||
|
|
||||||
allActivities.forEach((activityData: ActivityData | null) => {
|
allActivities.forEach((activityData: ActivityData | null) => {
|
||||||
if (activityData &&
|
if (activityData &&
|
||||||
@@ -301,13 +303,13 @@ app.get('/v1/activity/academicYear', async (_req: Request, res: Response) => {
|
|||||||
logger.info('No activity keys found in Redis for academic years.');
|
logger.info('No activity keys found in Redis for academic years.');
|
||||||
return res.json({});
|
return res.json({});
|
||||||
}
|
}
|
||||||
// 1. Fetch all activity data in parallel
|
// 1. Fetch all activity data sequentially
|
||||||
const allActivities = await Promise.all(
|
const allActivities: (ActivityData | null)[] = [];
|
||||||
activityKeys.map(async (key) => {
|
for (const key of activityKeys) {
|
||||||
const activityId = key.substring(ACTIVITY_KEY_PREFIX.length);
|
const activityId = key.substring(ACTIVITY_KEY_PREFIX.length);
|
||||||
return getActivityData(activityId);
|
const activityData = await getActivityData(activityId);
|
||||||
})
|
allActivities.push(activityData);
|
||||||
);
|
}
|
||||||
// 2. Count activities per academic year
|
// 2. Count activities per academic year
|
||||||
allActivities.forEach((activityData: ActivityData | null) => {
|
allActivities.forEach((activityData: ActivityData | null) => {
|
||||||
if (
|
if (
|
||||||
|
|||||||
@@ -1,6 +1,5 @@
|
|||||||
// services/cache-manager.ts
|
// services/cache-manager.ts
|
||||||
import { config } from 'dotenv';
|
import { config } from 'dotenv';
|
||||||
import pLimit from 'p-limit';
|
|
||||||
import { fetchActivityData } from '../engage-api/get-activity';
|
import { fetchActivityData } from '../engage-api/get-activity';
|
||||||
import { structActivityData } from '../engage-api/struct-activity';
|
import { structActivityData } from '../engage-api/struct-activity';
|
||||||
import { structStaffData } from '../engage-api/struct-staff';
|
import { structStaffData } from '../engage-api/struct-staff';
|
||||||
@@ -113,63 +112,34 @@ async function processSingleActivity(activityId: string): Promise<void> {
|
|||||||
|
|
||||||
/**
|
/**
|
||||||
* Initialize the club cache by scanning through all activity IDs
|
* Initialize the club cache by scanning through all activity IDs
|
||||||
* Processed in batches to prevent memory pressure from accumulating all promises upfront
|
* Processed sequentially in single-threaded mode
|
||||||
*/
|
*/
|
||||||
export async function initializeClubCache(): Promise<void> {
|
export async function initializeClubCache(): Promise<void> {
|
||||||
logger.info(`Starting initial club cache population from ID ${MIN_ACTIVITY_ID_SCAN} to ${MAX_ACTIVITY_ID_SCAN}`);
|
logger.info(`Starting initial club cache population from ID ${MIN_ACTIVITY_ID_SCAN} to ${MAX_ACTIVITY_ID_SCAN} (single-threaded mode)`);
|
||||||
|
|
||||||
const totalIds = MAX_ACTIVITY_ID_SCAN - MIN_ACTIVITY_ID_SCAN + 1;
|
const totalIds = MAX_ACTIVITY_ID_SCAN - MIN_ACTIVITY_ID_SCAN + 1;
|
||||||
const BATCH_SIZE = 100;
|
|
||||||
let processedCount = 0;
|
let processedCount = 0;
|
||||||
let successCount = 0;
|
let successCount = 0;
|
||||||
let errorCount = 0;
|
let errorCount = 0;
|
||||||
skippedCount = 0; // Reset for this run
|
skippedCount = 0; // Reset for this run
|
||||||
|
|
||||||
for (let batchStart = MIN_ACTIVITY_ID_SCAN; batchStart <= MAX_ACTIVITY_ID_SCAN; batchStart += BATCH_SIZE) {
|
for (let i = MIN_ACTIVITY_ID_SCAN; i <= MAX_ACTIVITY_ID_SCAN; i++) {
|
||||||
const batchEnd = Math.min(batchStart + BATCH_SIZE - 1, MAX_ACTIVITY_ID_SCAN);
|
|
||||||
const batchPromises: Promise<void>[] = [];
|
|
||||||
// Create fresh p-limit instance per batch to prevent internal queue accumulation
|
|
||||||
const limit = pLimit(CONCURRENT_API_CALLS);
|
|
||||||
|
|
||||||
logger.info(`Processing batch ${Math.floor(processedCount / BATCH_SIZE) + 1}/${Math.ceil(totalIds / BATCH_SIZE)} (IDs ${batchStart}-${batchEnd})`);
|
|
||||||
|
|
||||||
for (let i = batchStart; i <= batchEnd; i++) {
|
|
||||||
const activityId = String(i);
|
const activityId = String(i);
|
||||||
batchPromises.push(
|
|
||||||
limit(() =>
|
try {
|
||||||
processSingleActivity(activityId)
|
await processSingleActivity(activityId);
|
||||||
.then(() => {
|
|
||||||
successCount++;
|
successCount++;
|
||||||
processedCount++;
|
} catch (error) {
|
||||||
if (processedCount % 100 === 0) {
|
|
||||||
const mem = process.memoryUsage();
|
|
||||||
logger.info(`Progress: ${processedCount}/${totalIds} (${Math.round(processedCount/totalIds*100)}%) - Success: ${successCount}, Skipped: ${skippedCount}, Errors: ${errorCount} | Heap: ${Math.round(mem.heapUsed/1024/1024)}MB`);
|
|
||||||
}
|
|
||||||
})
|
|
||||||
.catch((error: unknown) => {
|
|
||||||
errorCount++;
|
errorCount++;
|
||||||
processedCount++;
|
|
||||||
logger.error(`Error processing activity ID ${activityId}:`, error);
|
logger.error(`Error processing activity ID ${activityId}:`, error);
|
||||||
|
}
|
||||||
|
|
||||||
|
processedCount++;
|
||||||
|
|
||||||
if (processedCount % 100 === 0) {
|
if (processedCount % 100 === 0) {
|
||||||
const mem = process.memoryUsage();
|
const mem = process.memoryUsage();
|
||||||
logger.info(`Progress: ${processedCount}/${totalIds} (${Math.round(processedCount/totalIds*100)}%) - Success: ${successCount}, Skipped: ${skippedCount}, Errors: ${errorCount} | Heap: ${Math.round(mem.heapUsed/1024/1024)}MB`);
|
logger.info(`Progress: ${processedCount}/${totalIds} (${Math.round(processedCount/totalIds*100)}%) - Success: ${successCount}, Skipped: ${skippedCount}, Errors: ${errorCount} | Heap: ${Math.round(mem.heapUsed/1024/1024)}MB`);
|
||||||
}
|
}
|
||||||
})
|
|
||||||
)
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
await Promise.allSettled(batchPromises);
|
|
||||||
batchPromises.length = 0;
|
|
||||||
|
|
||||||
// Garbage collection hint and longer delay between batches to allow event loop to drain
|
|
||||||
if (global.gc) {
|
|
||||||
global.gc(false);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (batchEnd < MAX_ACTIVITY_ID_SCAN) {
|
|
||||||
await new Promise(resolve => setTimeout(resolve, 500));
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
logger.info(`Initial club cache population finished.`);
|
logger.info(`Initial club cache population finished.`);
|
||||||
@@ -178,18 +148,16 @@ export async function initializeClubCache(): Promise<void> {
|
|||||||
|
|
||||||
/**
|
/**
|
||||||
* Update stale clubs in the cache
|
* Update stale clubs in the cache
|
||||||
|
* Processed sequentially in single-threaded mode
|
||||||
*/
|
*/
|
||||||
export async function updateStaleClubs(): Promise<void> {
|
export async function updateStaleClubs(): Promise<void> {
|
||||||
logger.info('Starting stale club check...');
|
logger.info('Starting stale club check (single-threaded mode)...');
|
||||||
const now = Date.now();
|
const now = Date.now();
|
||||||
const updateIntervalMs = CLUB_UPDATE_INTERVAL_MINS * 60 * 1000;
|
const updateIntervalMs = CLUB_UPDATE_INTERVAL_MINS * 60 * 1000;
|
||||||
const limit = pLimit(CONCURRENT_API_CALLS);
|
|
||||||
const promises: Promise<void>[] = [];
|
|
||||||
const activityKeys = await getAllActivityKeys();
|
const activityKeys = await getAllActivityKeys();
|
||||||
|
|
||||||
for (const key of activityKeys) {
|
for (const key of activityKeys) {
|
||||||
const activityId = key.substring(ACTIVITY_KEY_PREFIX.length);
|
const activityId = key.substring(ACTIVITY_KEY_PREFIX.length);
|
||||||
promises.push(limit(async () => {
|
|
||||||
const cachedData = await getActivityData(activityId);
|
const cachedData = await getActivityData(activityId);
|
||||||
|
|
||||||
if (cachedData && cachedData.lastCheck) {
|
if (cachedData && cachedData.lastCheck) {
|
||||||
@@ -202,27 +170,10 @@ export async function updateStaleClubs(): Promise<void> {
|
|||||||
logger.info(`Activity ${activityId} not in cache or is empty object. Attempting to fetch...`);
|
logger.info(`Activity ${activityId} not in cache or is empty object. Attempting to fetch...`);
|
||||||
await processAndCacheActivity(activityId);
|
await processAndCacheActivity(activityId);
|
||||||
}
|
}
|
||||||
}));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
await cleanupOrphanedS3Images();
|
await cleanupOrphanedS3Images();
|
||||||
|
|
||||||
// Process promises in batches to prevent event loop blocking
|
|
||||||
const BATCH_SIZE = 50;
|
|
||||||
for (let i = 0; i < promises.length; i += BATCH_SIZE) {
|
|
||||||
const batch = promises.slice(i, i + BATCH_SIZE);
|
|
||||||
const batchNum = Math.floor(i / BATCH_SIZE) + 1;
|
|
||||||
const totalBatches = Math.ceil(promises.length / BATCH_SIZE);
|
|
||||||
|
|
||||||
await Promise.all(batch);
|
|
||||||
logger.info(`Stale check batch ${batchNum}/${totalBatches} complete`);
|
|
||||||
|
|
||||||
// Small delay between batches to prevent event loop blocking
|
|
||||||
if (i + BATCH_SIZE < promises.length) {
|
|
||||||
await new Promise(resolve => setTimeout(resolve, 100));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
logger.info('Stale club check finished.');
|
logger.info('Stale club check finished.');
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -164,26 +164,16 @@ export async function deleteS3Objects(objectKeysArray: string[]): Promise<boolea
|
|||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
try {
|
try {
|
||||||
// With Bun's S3Client, we need to delete objects one by one
|
|
||||||
// Process in batches of 100 for better performance
|
|
||||||
const BATCH_SIZE = 100;
|
|
||||||
let successCount = 0;
|
let successCount = 0;
|
||||||
let errorCount = 0;
|
let errorCount = 0;
|
||||||
|
|
||||||
for (let i = 0; i < objectKeysArray.length; i += BATCH_SIZE) {
|
for (const key of objectKeysArray) {
|
||||||
const batch = objectKeysArray.slice(i, i + BATCH_SIZE);
|
try {
|
||||||
// Process batch in parallel
|
await s3Client.delete(key);
|
||||||
const results = await Promise.allSettled(
|
|
||||||
batch.map(key => s3Client!.delete(key))
|
|
||||||
);
|
|
||||||
// Count successes and failures
|
|
||||||
for (const result of results) {
|
|
||||||
if (result.status === 'fulfilled') {
|
|
||||||
successCount++;
|
successCount++;
|
||||||
} else {
|
} catch (error) {
|
||||||
errorCount++;
|
errorCount++;
|
||||||
logger.error(`Failed to delete object: ${result.reason}`);
|
logger.error(`Failed to delete object ${key}:`, error);
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
logger.info(`Deleted ${successCount} objects from S3. Failed: ${errorCount}`);
|
logger.info(`Deleted ${successCount} objects from S3. Failed: ${errorCount}`);
|
||||||
|
|||||||
Reference in New Issue
Block a user