feat: redis cache and detach image into s3

This commit is contained in:
JamesFlare1212
2025-05-09 19:43:01 -04:00
parent 99d1ee0a1e
commit f7252345f3
14 changed files with 2302 additions and 202 deletions

209
services/cache-manager.mjs Normal file
View File

@@ -0,0 +1,209 @@
// services/cache-manager.mjs
import dotenv from 'dotenv';
import pLimit from 'p-limit';
import { fetchActivityData } from '../engage-api/get-activity.mjs';
import { structActivityData } from '../engage-api/struct-activity.mjs';
import { structStaffData } from '../engage-api/struct-staff.mjs';
import {
getActivityData,
setActivityData,
getStaffData,
setStaffData,
getAllActivityKeys as getAllRedisActivityKeys, // Renamed import for clarity
ACTIVITY_KEY_PREFIX
} from './redis-service.mjs';
import { uploadImageFromBase64, listS3Objects, deleteS3Objects, constructS3Url } from './s3-service.mjs';
import { extractBase64Image } from '../utils/image-processor.mjs';
import { logger } from '../utils/logger.mjs';
dotenv.config();
const USERNAME = process.env.API_USERNAME;
const PASSWORD = process.env.API_PASSWORD;
const MAX_ACTIVITY_ID_SCAN = parseInt(process.env.MAX_ACTIVITY_ID_SCAN || '9999', 10);
const CONCURRENT_API_CALLS = parseInt(process.env.CONCURRENT_API_CALLS || '10', 10);
const CLUB_UPDATE_INTERVAL_MINS = parseInt(process.env.CLUB_UPDATE_INTERVAL_MINS || '60', 10);
const STAFF_UPDATE_INTERVAL_MINS = parseInt(process.env.STAFF_UPDATE_INTERVAL_MINS || '60', 10);
const FIXED_STAFF_ACTIVITY_ID = process.env.FIXED_STAFF_ACTIVITY_ID;
const S3_IMAGE_PREFIX = (process.env.S3_PUBLIC_URL_PREFIX || 'files').replace(/\/$/, ''); // Ensure no trailing slash
const limit = pLimit(CONCURRENT_API_CALLS);
async function processAndCacheActivity(activityId) {
logger.debug(`Processing activity ID: ${activityId}`);
try {
const activityJson = await fetchActivityData(activityId, USERNAME, PASSWORD);
let structuredActivity;
if (!activityJson) {
logger.info(`No data found for activity ID ${activityId} from engage API. Caching as empty.`);
structuredActivity = {};
} else {
structuredActivity = await structActivityData(activityJson);
if (structuredActivity && structuredActivity.photo && structuredActivity.photo.startsWith('data:image')) {
const imageInfo = extractBase64Image(structuredActivity.photo);
if (imageInfo) {
const s3Url = await uploadImageFromBase64(imageInfo.base64Content, imageInfo.format, activityId);
if (s3Url) {
structuredActivity.photo = s3Url;
} else {
logger.warn(`Failed to upload image to S3 for activity ${activityId}. Base64 photo data will be removed or kept as is depending on structActivityData's behavior if upload fails.`);
// Potentially set photo to null or remove if upload is critical and fails
// structuredActivity.photo = null; // Example
}
}
}
}
structuredActivity.lastCheck = new Date().toISOString();
await setActivityData(activityId, structuredActivity);
// logger.info(`Successfully processed and cached activity ID: ${activityId}`); // Can be too verbose
return structuredActivity;
} catch (error) {
logger.error(`Error processing activity ID ${activityId}:`, error);
const errorData = { lastCheck: new Date().toISOString(), error: "Failed to fetch or process" };
await setActivityData(activityId, errorData);
return errorData;
}
}
export async function initializeClubCache() {
logger.info(`Starting initial club cache population up to ID ${MAX_ACTIVITY_ID_SCAN}...`);
const promises = [];
for (let i = 0; i <= MAX_ACTIVITY_ID_SCAN; i++) {
const activityId = String(i);
promises.push(limit(async () => {
const cachedData = await getActivityData(activityId);
if (!cachedData || Object.keys(cachedData).length === 0 || !cachedData.lastCheck || cachedData.error) {
logger.debug(`Initializing cache for activity ID: ${activityId}`);
await processAndCacheActivity(activityId);
}
}));
}
await Promise.all(promises);
logger.info('Initial club cache population finished.');
}
export async function updateStaleClubs() {
logger.info('Starting stale club check...');
const now = Date.now();
const updateIntervalMs = CLUB_UPDATE_INTERVAL_MINS * 60 * 1000;
const promises = [];
const activityKeys = await getAllRedisActivityKeys(); // More efficient than iterating 0-MAX_ID if many are empty
for (const key of activityKeys) {
const activityId = key.substring(ACTIVITY_KEY_PREFIX.length);
promises.push(limit(async () => {
const cachedData = await getActivityData(activityId); // Re-fetch to get latest before deciding
if (cachedData && cachedData.lastCheck) {
const lastCheckTime = new Date(cachedData.lastCheck).getTime();
if ((now - lastCheckTime) > updateIntervalMs || cachedData.error) { // Also update if previous fetch had an error
logger.info(`Activity ${activityId} is stale or had error. Updating...`);
await processAndCacheActivity(activityId);
}
} else if (!cachedData || Object.keys(cachedData).length === 0) {
// This case handles if a key was deleted or somehow became totally empty
logger.info(`Activity ${activityId} not in cache or is empty object. Attempting to fetch...`);
await processAndCacheActivity(activityId);
}
}));
}
// Optionally, iterate 0-MAX_ID_SCAN for any IDs not yet in Redis (newly created activities)
// This part can be heavy. Consider if getAllRedisActivityKeys is sufficient for "staleness".
// For truly new activities, they'd be picked up on direct API call or a less frequent full scan.
await Promise.all(promises);
logger.info('Stale club check finished.');
}
export async function initializeOrUpdateStaffCache(forceUpdate = false) {
logger.info('Starting staff cache check/update...');
try {
const cachedStaffData = await getStaffData();
const now = Date.now();
const updateIntervalMs = STAFF_UPDATE_INTERVAL_MINS * 60 * 1000;
let needsUpdate = forceUpdate;
if (!cachedStaffData || !cachedStaffData.lastCheck) {
needsUpdate = true;
} else {
const lastCheckTime = new Date(cachedStaffData.lastCheck).getTime();
if ((now - lastCheckTime) > updateIntervalMs) {
needsUpdate = true;
}
}
if (needsUpdate) {
logger.info('Staff data needs update. Fetching...');
const activityJson = await fetchActivityData(FIXED_STAFF_ACTIVITY_ID, USERNAME, PASSWORD);
if (activityJson) {
const staffMap = await structStaffData(activityJson);
const staffObject = Object.fromEntries(staffMap);
staffObject.lastCheck = new Date().toISOString();
await setStaffData(staffObject);
logger.info('Staff data updated and cached.');
} else {
logger.warn(`Could not retrieve base data for staff (activity ID ${FIXED_STAFF_ACTIVITY_ID}).`);
if(cachedStaffData && cachedStaffData.lastCheck){ // If old data exists, just update its lastCheck
cachedStaffData.lastCheck = new Date().toISOString();
await setStaffData(cachedStaffData);
}
}
} else {
logger.info('Staff data is up-to-date.');
}
} catch (error) {
logger.error('Error initializing or updating staff cache:', error);
}
}
export async function cleanupOrphanedS3Images() {
logger.info('Starting S3 orphan image cleanup...');
const s3ObjectListPrefix = S3_IMAGE_PREFIX ? `${S3_IMAGE_PREFIX}/` : ''; // Ensure trailing slash for prefix listing
try {
const referencedS3Urls = new Set();
const allActivityRedisKeys = await getAllRedisActivityKeys();
for (const redisKey of allActivityRedisKeys) {
const activityId = redisKey.substring(ACTIVITY_KEY_PREFIX.length);
const activityData = await getActivityData(activityId); // Fetch by ID
if (activityData && typeof activityData.photo === 'string' && activityData.photo.startsWith('http')) { // Assuming S3 URLs start with http/https
// Check if the photo URL matches the expected S3 endpoint structure
if (activityData.photo.startsWith(process.env.S3_ENDPOINT)) {
referencedS3Urls.add(activityData.photo);
}
}
}
logger.info(`Found ${referencedS3Urls.size} unique S3 URLs referenced in Redis.`);
const s3ObjectKeys = await listS3Objects(s3ObjectListPrefix);
if (!s3ObjectKeys || s3ObjectKeys.length === 0) {
logger.info(`No images found in S3 under prefix "${s3ObjectListPrefix}". Nothing to clean up.`);
return;
}
logger.debug(`Found ${s3ObjectKeys.length} objects in S3 under prefix "${s3ObjectListPrefix}".`);
const orphanedObjectKeys = [];
for (const objectKey of s3ObjectKeys) {
const s3Url = constructS3Url(objectKey); // Construct URL from key to compare
if (!referencedS3Urls.has(s3Url)) {
orphanedObjectKeys.push(objectKey);
}
}
if (orphanedObjectKeys.length > 0) {
logger.info(`Found ${orphanedObjectKeys.length} orphaned S3 objects to delete. Submitting deletion...`);
// orphanedObjectKeys.forEach(key => logger.debug(`Orphaned key: ${key}`)); // For verbose debugging
await deleteS3Objects(orphanedObjectKeys);
} else {
logger.info('No orphaned S3 images found after comparison.');
}
logger.info('S3 orphan image cleanup finished.');
} catch (error) {
logger.error('Error during S3 orphan image cleanup:', error);
}
}

135
services/redis-service.mjs Normal file
View File

@@ -0,0 +1,135 @@
// services/redis-service.mjs
import Redis from 'ioredis';
import dotenv from 'dotenv';
import { logger } from '../utils/logger.mjs';
dotenv.config();
const redisUrl = process.env.REDIS_URL;
let redisClient;
export const ACTIVITY_KEY_PREFIX = 'activity:'; // Exported for use in cache-manager
const STAFF_KEY = 'staffs:all';
try {
if (redisUrl) {
redisClient = new Redis(redisUrl);
redisClient.on('connect', () => {
logger.info('Connected to Redis successfully!');
});
redisClient.on('error', (err) => {
logger.error('Redis connection error:', err);
});
} else {
logger.error('REDIS_URL not defined. Redis client not initialized.');
}
} catch (error) {
logger.error('Failed to initialize Redis client:', error);
}
/**
* Gets activity data from Redis.
* @param {string} activityId
* @returns {Promise<object|null>} Parsed JSON object or null if not found/error.
*/
export async function getActivityData(activityId) {
if (!redisClient) {
logger.warn('Redis client not available, skipping getActivityData');
return null;
}
try {
const data = await redisClient.get(`${ACTIVITY_KEY_PREFIX}${activityId}`);
return data ? JSON.parse(data) : null;
} catch (err) {
logger.error(`Error getting activity ${activityId} from Redis:`, err);
return null;
}
}
/**
* Sets activity data in Redis.
* @param {string} activityId
* @param {object} data The activity data object.
* @returns {Promise<void>}
*/
export async function setActivityData(activityId, data) {
if (!redisClient) {
logger.warn('Redis client not available, skipping setActivityData');
return;
}
try {
await redisClient.set(`${ACTIVITY_KEY_PREFIX}${activityId}`, JSON.stringify(data));
} catch (err) {
logger.error(`Error setting activity ${activityId} in Redis:`, err);
}
}
/**
* Gets staff data from Redis.
* @returns {Promise<object|null>} Parsed JSON object or null if not found/error.
*/
export async function getStaffData() {
if (!redisClient) {
logger.warn('Redis client not available, skipping getStaffData');
return null;
}
try {
const data = await redisClient.get(STAFF_KEY);
return data ? JSON.parse(data) : null;
} catch (err) {
logger.error('Error getting staff data from Redis:', err);
return null;
}
}
/**
* Sets staff data in Redis.
* @param {object} data The staff data object.
* @returns {Promise<void>}
*/
export async function setStaffData(data) {
if (!redisClient) {
logger.warn('Redis client not available, skipping setStaffData');
return;
}
try {
await redisClient.set(STAFF_KEY, JSON.stringify(data));
} catch (err) {
logger.error('Error setting staff data in Redis:', err);
}
}
/**
* Gets all activity keys from Redis.
* This can be resource-intensive on large datasets. Use with caution.
* @returns {Promise<string[]>} An array of keys.
*/
export async function getAllActivityKeys() {
if (!redisClient) {
logger.warn('Redis client not available, skipping getAllActivityKeys');
return [];
}
try {
// Using SCAN for better performance than KEYS on production Redis
let cursor = '0';
const keys = [];
do {
const [nextCursor, foundKeys] = await redisClient.scan(cursor, 'MATCH', `${ACTIVITY_KEY_PREFIX}*`, 'COUNT', '100');
keys.push(...foundKeys);
cursor = nextCursor;
} while (cursor !== '0');
logger.info(`Found ${keys.length} activity keys in Redis using SCAN.`);
return keys;
} catch (err) {
logger.error('Error getting all activity keys from Redis using SCAN:', err);
return []; // Fallback or indicate error
}
}
export function getRedisClient() {
return redisClient;
}

186
services/s3-service.mjs Normal file
View File

@@ -0,0 +1,186 @@
// services/s3-service.mjs
import { S3Client, PutObjectCommand, ListObjectsV2Command, DeleteObjectsCommand } from '@aws-sdk/client-s3';
import { v4 as uuidv4 } from 'uuid';
import dotenv from 'dotenv';
import { logger } from '../utils/logger.mjs';
import { decodeBase64Image } from '../utils/image-processor.mjs';
dotenv.config();
const S3_ENDPOINT = process.env.S3_ENDPOINT;
const S3_REGION = process.env.S3_REGION;
const S3_ACCESS_KEY_ID = process.env.S3_ACCESS_KEY_ID;
const S3_SECRET_ACCESS_KEY = process.env.S3_SECRET_ACCESS_KEY;
const BUCKET_NAME = process.env.S3_BUCKET_NAME;
const PUBLIC_URL_FILE_PREFIX = (process.env.S3_PUBLIC_URL_PREFIX || 'files').replace(/\/$/, ''); // Ensures no trailing slash
let s3Client;
if (S3_ENDPOINT && S3_REGION && S3_ACCESS_KEY_ID && S3_SECRET_ACCESS_KEY && BUCKET_NAME) {
s3Client = new S3Client({
endpoint: S3_ENDPOINT,
region: S3_REGION,
credentials: {
accessKeyId: S3_ACCESS_KEY_ID,
secretAccessKey: S3_SECRET_ACCESS_KEY,
},
forcePathStyle: true, // Important for MinIO and some S3-compatibles
});
} else {
logger.warn('S3 client configuration is incomplete. S3 operations will be disabled.');
}
/**
* Uploads an image from a base64 string to S3.
* @param {string} base64Data The base64 content (without the data URI prefix).
* @param {string} originalFormat The image format (e.g., 'png', 'jpeg').
* @param {string} activityId The activity ID, used for naming.
* @returns {Promise<string|null>} The public URL of the uploaded image or null on error.
*/
export async function uploadImageFromBase64(base64Data, originalFormat, activityId) {
if (!s3Client) {
logger.warn('S3 client not configured. Cannot upload image.');
return null;
}
if (!base64Data || !originalFormat || !activityId) {
logger.error('S3 Upload: Missing base64Data, originalFormat, or activityId');
return null;
}
try {
const imageBuffer = decodeBase64Image(base64Data);
// Ensure PUBLIC_URL_FILE_PREFIX is part of the key
const objectKey = `${PUBLIC_URL_FILE_PREFIX}/activity-${activityId}-${uuidv4()}.${originalFormat}`;
const params = {
Bucket: BUCKET_NAME,
Key: objectKey,
Body: imageBuffer,
ContentType: `image/${originalFormat}`,
ACL: 'public-read',
};
await s3Client.send(new PutObjectCommand(params));
const publicUrl = constructS3Url(objectKey);
logger.info(`Image uploaded to S3: ${publicUrl}`);
return publicUrl;
} catch (error) {
logger.error(`S3 Upload Error for activity ${activityId}:`, error);
return null;
}
}
/**
* Lists all objects in the S3 bucket under a specific prefix.
* @param {string} prefix The prefix to filter objects by (e.g., S3_PUBLIC_URL_PREFIX + '/').
* @returns {Promise<Array<string>>} A list of object keys.
*/
export async function listS3Objects(prefix) {
if (!s3Client) {
logger.warn('S3 client not configured. Cannot list objects.');
return [];
}
const objectKeys = [];
let isTruncated = true;
let continuationToken;
logger.debug(`Listing objects from S3 with prefix: "${prefix}"`);
const listCommandInput = { // Renamed to avoid conflict if command is redefined in loop
Bucket: BUCKET_NAME,
Prefix: prefix,
};
try {
while (isTruncated) {
if (continuationToken) {
listCommandInput.ContinuationToken = continuationToken;
}
const command = new ListObjectsV2Command(listCommandInput);
const { Contents, IsTruncated: NextIsTruncated, NextContinuationToken } = await s3Client.send(command);
if (Contents) {
Contents.forEach(item => {
if (item.Key && !item.Key.endsWith('/')) { // Ensure it's a file, not a pseudo-directory
objectKeys.push(item.Key);
}
});
}
isTruncated = NextIsTruncated;
continuationToken = NextContinuationToken;
}
logger.info(`Listed ${objectKeys.length} object keys from S3 with prefix "${prefix}"`);
return objectKeys;
} catch (error) {
logger.error(`S3 ListObjects Error with prefix "${prefix}":`, error);
return [];
}
}
/**
* Deletes multiple objects from S3.
* @param {Array<string>} objectKeysArray Array of object keys to delete.
* @returns {Promise<boolean>} True if successful or partially successful, false on major error.
*/
export async function deleteS3Objects(objectKeysArray) {
if (!s3Client) {
logger.warn('S3 client not configured. Cannot delete objects.');
return false;
}
if (!objectKeysArray || objectKeysArray.length === 0) {
logger.info('No objects to delete from S3.');
return true;
}
const MAX_DELETE_COUNT = 1000; // S3 API limit
let allDeletionsSuccessful = true;
for (let i = 0; i < objectKeysArray.length; i += MAX_DELETE_COUNT) {
const chunk = objectKeysArray.slice(i, i + MAX_DELETE_COUNT);
const deleteParams = {
Bucket: BUCKET_NAME,
Delete: {
Objects: chunk.map(key => ({ Key: key })),
Quiet: false, // We want error details
},
};
try {
const command = new DeleteObjectsCommand(deleteParams);
const output = await s3Client.send(command);
if (output.Errors && output.Errors.length > 0) {
allDeletionsSuccessful = false;
output.Errors.forEach(err => {
logger.error(`S3 Delete Error for key ${err.Key}: ${err.Message}`);
});
}
if (output.Deleted && output.Deleted.length > 0) {
logger.info(`Successfully submitted deletion for ${output.Deleted.length} objects from S3 chunk (some might have failed, check individual errors).`);
}
} catch (error) {
logger.error('S3 DeleteObjects Command Error for a chunk:', error);
allDeletionsSuccessful = false;
}
}
if (allDeletionsSuccessful && objectKeysArray.length > 0) {
logger.info(`Finished S3 deletion request for ${objectKeysArray.length} keys.`);
} else if (objectKeysArray.length > 0) {
logger.warn(`S3 deletion request for ${objectKeysArray.length} keys completed with some errors.`);
}
return allDeletionsSuccessful;
}
/**
* Constructs the public S3 URL for an object key.
* @param {string} objectKey The key of the object in S3.
* @returns {string} The full public URL.
*/
export function constructS3Url(objectKey) {
// Ensure S3_ENDPOINT does not end with a slash
const s3Base = S3_ENDPOINT.replace(/\/$/, '');
// Ensure BUCKET_NAME does not start or end with a slash
const bucket = BUCKET_NAME.replace(/^\//, '').replace(/\/$/, '');
// Ensure objectKey does not start with a slash
const key = objectKey.replace(/^\//, '');
return `${s3Base}/${bucket}/${key}`;
}