210 lines
10 KiB
JavaScript
210 lines
10 KiB
JavaScript
// services/cache-manager.mjs
|
|
import dotenv from 'dotenv';
|
|
import pLimit from 'p-limit';
|
|
import { fetchActivityData } from '../engage-api/get-activity.mjs';
|
|
import { structActivityData } from '../engage-api/struct-activity.mjs';
|
|
import { structStaffData } from '../engage-api/struct-staff.mjs';
|
|
import {
|
|
getActivityData,
|
|
setActivityData,
|
|
getStaffData,
|
|
setStaffData,
|
|
getAllActivityKeys as getAllRedisActivityKeys, // Renamed import for clarity
|
|
ACTIVITY_KEY_PREFIX
|
|
} from './redis-service.mjs';
|
|
import { uploadImageFromBase64, listS3Objects, deleteS3Objects, constructS3Url } from './s3-service.mjs';
|
|
import { extractBase64Image } from '../utils/image-processor.mjs';
|
|
import { logger } from '../utils/logger.mjs';
|
|
|
|
dotenv.config();
|
|
|
|
const USERNAME = process.env.API_USERNAME;
|
|
const PASSWORD = process.env.API_PASSWORD;
|
|
const MIN_ACTIVITY_ID_SCAN = parseInt(process.env.MIN_ACTIVITY_ID_SCAN || '0', 10);
|
|
const MAX_ACTIVITY_ID_SCAN = parseInt(process.env.MAX_ACTIVITY_ID_SCAN || '9999', 10);
|
|
const CONCURRENT_API_CALLS = parseInt(process.env.CONCURRENT_API_CALLS || '10', 10);
|
|
const CLUB_UPDATE_INTERVAL_MINS = parseInt(process.env.CLUB_UPDATE_INTERVAL_MINS || '60', 10);
|
|
const STAFF_UPDATE_INTERVAL_MINS = parseInt(process.env.STAFF_UPDATE_INTERVAL_MINS || '60', 10);
|
|
const FIXED_STAFF_ACTIVITY_ID = process.env.FIXED_STAFF_ACTIVITY_ID;
|
|
const S3_IMAGE_PREFIX = (process.env.S3_PUBLIC_URL_PREFIX || 'files').replace(/\/$/, ''); // Ensure no trailing slash
|
|
|
|
const limit = pLimit(CONCURRENT_API_CALLS);
|
|
|
|
async function processAndCacheActivity(activityId) {
|
|
logger.debug(`Processing activity ID: ${activityId}`);
|
|
try {
|
|
const activityJson = await fetchActivityData(activityId, USERNAME, PASSWORD);
|
|
let structuredActivity;
|
|
|
|
if (!activityJson) {
|
|
logger.info(`No data found for activity ID ${activityId} from engage API. Caching as empty.`);
|
|
structuredActivity = {};
|
|
} else {
|
|
structuredActivity = await structActivityData(activityJson);
|
|
if (structuredActivity && structuredActivity.photo && structuredActivity.photo.startsWith('data:image')) {
|
|
const imageInfo = extractBase64Image(structuredActivity.photo);
|
|
if (imageInfo) {
|
|
const s3Url = await uploadImageFromBase64(imageInfo.base64Content, imageInfo.format, activityId);
|
|
if (s3Url) {
|
|
structuredActivity.photo = s3Url;
|
|
} else {
|
|
logger.warn(`Failed to upload image to S3 for activity ${activityId}. Base64 photo data will be removed or kept as is depending on structActivityData's behavior if upload fails.`);
|
|
// Potentially set photo to null or remove if upload is critical and fails
|
|
// structuredActivity.photo = null; // Example
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
structuredActivity.lastCheck = new Date().toISOString();
|
|
await setActivityData(activityId, structuredActivity);
|
|
// logger.info(`Successfully processed and cached activity ID: ${activityId}`); // Can be too verbose
|
|
return structuredActivity;
|
|
} catch (error) {
|
|
logger.error(`Error processing activity ID ${activityId}:`, error);
|
|
const errorData = { lastCheck: new Date().toISOString(), error: "Failed to fetch or process" };
|
|
await setActivityData(activityId, errorData);
|
|
return errorData;
|
|
}
|
|
}
|
|
|
|
export async function initializeClubCache() {
|
|
logger.info(`Starting initial club cache population from ID ${MAX_ACTIVITY_ID_SCAN} to ${MAX_ACTIVITY_ID_SCAN}`);
|
|
const promises = [];
|
|
for (let i = MIN_ACTIVITY_ID_SCAN; i <= MAX_ACTIVITY_ID_SCAN; i++) {
|
|
const activityId = String(i);
|
|
promises.push(limit(async () => {
|
|
const cachedData = await getActivityData(activityId);
|
|
if (!cachedData || Object.keys(cachedData).length === 0 || !cachedData.lastCheck || cachedData.error) {
|
|
logger.debug(`Initializing cache for activity ID: ${activityId}`);
|
|
await processAndCacheActivity(activityId);
|
|
}
|
|
}));
|
|
}
|
|
await Promise.all(promises);
|
|
logger.info('Initial club cache population finished.');
|
|
}
|
|
|
|
export async function updateStaleClubs() {
|
|
logger.info('Starting stale club check...');
|
|
const now = Date.now();
|
|
const updateIntervalMs = CLUB_UPDATE_INTERVAL_MINS * 60 * 1000;
|
|
const promises = [];
|
|
const activityKeys = await getAllRedisActivityKeys(); // More efficient than iterating 0-MAX_ID if many are empty
|
|
|
|
for (const key of activityKeys) {
|
|
const activityId = key.substring(ACTIVITY_KEY_PREFIX.length);
|
|
promises.push(limit(async () => {
|
|
const cachedData = await getActivityData(activityId); // Re-fetch to get latest before deciding
|
|
if (cachedData && cachedData.lastCheck) {
|
|
const lastCheckTime = new Date(cachedData.lastCheck).getTime();
|
|
if ((now - lastCheckTime) > updateIntervalMs || cachedData.error) { // Also update if previous fetch had an error
|
|
logger.info(`Activity ${activityId} is stale or had error. Updating...`);
|
|
await processAndCacheActivity(activityId);
|
|
}
|
|
} else if (!cachedData || Object.keys(cachedData).length === 0) {
|
|
// This case handles if a key was deleted or somehow became totally empty
|
|
logger.info(`Activity ${activityId} not in cache or is empty object. Attempting to fetch...`);
|
|
await processAndCacheActivity(activityId);
|
|
}
|
|
}));
|
|
}
|
|
// Optionally, iterate 0-MAX_ID_SCAN for any IDs not yet in Redis (newly created activities)
|
|
// This part can be heavy. Consider if getAllRedisActivityKeys is sufficient for "staleness".
|
|
// For truly new activities, they'd be picked up on direct API call or a less frequent full scan.
|
|
|
|
await Promise.all(promises);
|
|
logger.info('Stale club check finished.');
|
|
}
|
|
|
|
|
|
export async function initializeOrUpdateStaffCache(forceUpdate = false) {
|
|
logger.info('Starting staff cache check/update...');
|
|
try {
|
|
const cachedStaffData = await getStaffData();
|
|
const now = Date.now();
|
|
const updateIntervalMs = STAFF_UPDATE_INTERVAL_MINS * 60 * 1000;
|
|
let needsUpdate = forceUpdate;
|
|
|
|
if (!cachedStaffData || !cachedStaffData.lastCheck) {
|
|
needsUpdate = true;
|
|
} else {
|
|
const lastCheckTime = new Date(cachedStaffData.lastCheck).getTime();
|
|
if ((now - lastCheckTime) > updateIntervalMs) {
|
|
needsUpdate = true;
|
|
}
|
|
}
|
|
|
|
if (needsUpdate) {
|
|
logger.info('Staff data needs update. Fetching...');
|
|
const activityJson = await fetchActivityData(FIXED_STAFF_ACTIVITY_ID, USERNAME, PASSWORD);
|
|
if (activityJson) {
|
|
const staffMap = await structStaffData(activityJson);
|
|
const staffObject = Object.fromEntries(staffMap);
|
|
staffObject.lastCheck = new Date().toISOString();
|
|
await setStaffData(staffObject);
|
|
logger.info('Staff data updated and cached.');
|
|
} else {
|
|
logger.warn(`Could not retrieve base data for staff (activity ID ${FIXED_STAFF_ACTIVITY_ID}).`);
|
|
if(cachedStaffData && cachedStaffData.lastCheck){ // If old data exists, just update its lastCheck
|
|
cachedStaffData.lastCheck = new Date().toISOString();
|
|
await setStaffData(cachedStaffData);
|
|
}
|
|
}
|
|
} else {
|
|
logger.info('Staff data is up-to-date.');
|
|
}
|
|
} catch (error) {
|
|
logger.error('Error initializing or updating staff cache:', error);
|
|
}
|
|
}
|
|
|
|
export async function cleanupOrphanedS3Images() {
|
|
logger.info('Starting S3 orphan image cleanup...');
|
|
const s3ObjectListPrefix = S3_IMAGE_PREFIX ? `${S3_IMAGE_PREFIX}/` : ''; // Ensure trailing slash for prefix listing
|
|
|
|
try {
|
|
const referencedS3Urls = new Set();
|
|
const allActivityRedisKeys = await getAllRedisActivityKeys();
|
|
|
|
for (const redisKey of allActivityRedisKeys) {
|
|
const activityId = redisKey.substring(ACTIVITY_KEY_PREFIX.length);
|
|
const activityData = await getActivityData(activityId); // Fetch by ID
|
|
if (activityData && typeof activityData.photo === 'string' && activityData.photo.startsWith('http')) { // Assuming S3 URLs start with http/https
|
|
// Check if the photo URL matches the expected S3 endpoint structure
|
|
if (activityData.photo.startsWith(process.env.S3_ENDPOINT)) {
|
|
referencedS3Urls.add(activityData.photo);
|
|
}
|
|
}
|
|
}
|
|
logger.info(`Found ${referencedS3Urls.size} unique S3 URLs referenced in Redis.`);
|
|
|
|
const s3ObjectKeys = await listS3Objects(s3ObjectListPrefix);
|
|
if (!s3ObjectKeys || s3ObjectKeys.length === 0) {
|
|
logger.info(`No images found in S3 under prefix "${s3ObjectListPrefix}". Nothing to clean up.`);
|
|
return;
|
|
}
|
|
logger.debug(`Found ${s3ObjectKeys.length} objects in S3 under prefix "${s3ObjectListPrefix}".`);
|
|
|
|
const orphanedObjectKeys = [];
|
|
for (const objectKey of s3ObjectKeys) {
|
|
const s3Url = constructS3Url(objectKey); // Construct URL from key to compare
|
|
if (!referencedS3Urls.has(s3Url)) {
|
|
orphanedObjectKeys.push(objectKey);
|
|
}
|
|
}
|
|
|
|
if (orphanedObjectKeys.length > 0) {
|
|
logger.info(`Found ${orphanedObjectKeys.length} orphaned S3 objects to delete. Submitting deletion...`);
|
|
// orphanedObjectKeys.forEach(key => logger.debug(`Orphaned key: ${key}`)); // For verbose debugging
|
|
await deleteS3Objects(orphanedObjectKeys);
|
|
} else {
|
|
logger.info('No orphaned S3 images found after comparison.');
|
|
}
|
|
|
|
logger.info('S3 orphan image cleanup finished.');
|
|
|
|
} catch (error) {
|
|
logger.error('Error during S3 orphan image cleanup:', error);
|
|
}
|
|
} |