mixins_batch-loading-mixin.js

const { defaultLogger: logger } = require("../utils/logger");
const { ObjectNotFound } = require("../object-not-found");
const { pluginManager } = require("../plugin-manager");
const { BatchGetCommand, GetCommand } = require("@aws-sdk/lib-dynamodb");

// Move these constants from model.js to here
const BATCH_REQUESTS = new Map(); // testId -> { modelName-delay -> batch }
const DEFAULT_BATCH_DELAY_MS = 5;
const BATCH_REQUEST_TIMEOUT = 10000; // 10 seconds max lifetime for a batch

const BatchLoadingMethods = {
  _getBatchRequests() {
    const testId = this._testId || "default";
    if (!BATCH_REQUESTS.has(testId)) {
      BATCH_REQUESTS.set(testId, new Map());
    }
    return BATCH_REQUESTS.get(testId);
  },

  /**
   * @memberof BaoModel
   * @description
   * This is the primary way to load multiple objects given an array of ids.
   * This function should only be used when {@link BaoModel.find} or {@link BaoModel#loadRelatedData} is not sufficient.
   *
   * @param {string[]} primaryIds - The primary IDs of the items to load
   * @param {Object} [loaderContext] - Cache context for storing and retrieving items across requests.
   * @returns {Promise<Object>} Returns a promise that resolves to the loaded items and their consumed capacity
   */
  async batchFind(primaryIds, loaderContext = null) {
    if (!primaryIds?.length) return { items: {}, ConsumedCapacity: [] };

    // Add retry wrapper function
    const retryOperation = async (operation, maxRetries = 3) => {
      for (let attempt = 0; attempt < maxRetries; attempt++) {
        try {
          return await operation();
        } catch (error) {
          if (attempt === maxRetries - 1) throw error;

          // Check if it's a network-related error
          if (
            error.name === "TimeoutError" ||
            error.code === "NetworkingError" ||
            error.message.includes("getaddrinfo ENOTFOUND")
          ) {
            const delay = Math.pow(2, attempt) * 100; // exponential backoff
            await new Promise((resolve) => setTimeout(resolve, delay));
            continue;
          }

          throw error; // rethrow non-network errors immediately
        }
      }
    };

    // Initialize results object
    const results = {};
    let idsToLoad = [];

    // First check loaderContext for existing items
    if (loaderContext) {
      primaryIds.forEach((id) => {
        if (loaderContext[id]) {
          const instance = this._createFromDyItem(loaderContext[id]._dyData);
          results[id] = instance;
        } else {
          idsToLoad.push(id);
        }
      });
    } else {
      idsToLoad.push(...primaryIds);
    }

    // If all items were in context, return early
    if (!idsToLoad.length) {
      return { items: results, ConsumedCapacity: [] };
    }

    const consumedCapacity = [];

    // remove duplicates from idsToLoad
    idsToLoad = [...new Set(idsToLoad)];

    // Process items in batches of 100
    for (let i = 0; i < idsToLoad.length; i += 100) {
      const batchIds = idsToLoad.slice(i, i + 100);
      const Keys = batchIds.map((id) => {
        const pkSk = this._parsePrimaryId(id);
        return this._getDyKeyForPkSk(pkSk);
      });

      let unprocessedKeys = Keys;
      const maxRetries = 3;
      let retryCount = 0;

      while (unprocessedKeys.length > 0 && retryCount < maxRetries) {
        // Wrap the batchGet call with our retry function
        const batchResult = await retryOperation(() =>
          this.documentClient.send(
            new BatchGetCommand({
              RequestItems: {
                [this.table]: {
                  Keys: unprocessedKeys,
                },
              },
              ReturnConsumedCapacity: "TOTAL",
            }),
          ),
        );

        // Process successful items
        if (batchResult.Responses?.[this.table]) {
          batchResult.Responses[this.table].forEach((item) => {
            const instance = this._createFromDyItem(item);
            const primaryId = instance.getPrimaryId();
            results[primaryId] = instance;

            // Add to loader context if provided
            if (loaderContext) {
              loaderContext[primaryId] = instance;
            }
          });
        }

        // Track consumed capacity
        if (batchResult.ConsumedCapacity) {
          consumedCapacity.push(...[].concat(batchResult.ConsumedCapacity));
        }

        // Handle unprocessed keys
        unprocessedKeys = batchResult.UnprocessedKeys?.[this.table]?.Keys || [];

        if (unprocessedKeys.length > 0) {
          retryCount++;
          // Add exponential backoff if needed
          await new Promise((resolve) =>
            setTimeout(resolve, Math.pow(2, retryCount) * 100),
          );
        }
      }

      // If we still have unprocessed keys after retries, log a warning
      if (unprocessedKeys.length > 0) {
        console.warn(
          `Failed to process ${unprocessedKeys.length} items after ${maxRetries} retries`,
        );
      }
    }

    return {
      items: results,
      ConsumedCapacity: consumedCapacity,
    };
  },

  /**
   *@memberof BaoModel
   *
   * @description
   * Find is the primary way to look up an object given its id. It will return the object
   * if it exists, or an {@link ObjectNotFound} instance if it does not. Find supports
   * efficient batch loading and caching. In general, this function should be
   * preferred over {@link BaoModel.batchFind}. Find uses batchFind internally, unless batchDelay
   * is set to 0.
   *
   * @param {string} primaryId - The primary ID of the item to find
   * @param {Object} [options={}] - Optional configuration for the find operation
   * @param {number} [options.batchDelay=5] - Delay in milliseconds before executing batch request.
   *                                         Set to 0 for immediate individual requests
   * @param {Object} [options.loaderContext] - Cache context for storing and retrieving items across requests.
   *                                          If provided, results will be stored in and retrieved from this context
   * @returns {Promise<Object>} Returns a promise that resolves to the found item instance or ObjectNotFound
   * @throws {Error} If the batch request times out or other errors occur during the operation
   */
  async find(primaryId, options = {}) {
    const batchDelay = options.batchDelay ?? DEFAULT_BATCH_DELAY_MS;
    const loaderContext = options.loaderContext;

    // Check loader context first
    if (loaderContext && loaderContext[primaryId]) {
      const cachedItem = loaderContext[primaryId];
      const instance = this._createFromDyItem(cachedItem._dyData);
      const consumedCapacity =
        cachedItem.getConsumedCapacity().consumedCapacity;
      instance._addConsumedCapacity(consumedCapacity, "read", true);
      return instance;
    }

    if (batchDelay === 0) {
      // Direct DynamoDB request logic
      const pkSk = this._parsePrimaryId(primaryId);
      const dyKey = this._getDyKeyForPkSk(pkSk);
      const result = await this.documentClient.send(
        new GetCommand({
          TableName: this.table,
          Key: dyKey,
          ReturnConsumedCapacity: "TOTAL",
        }),
      );

      let instance;
      if (!result.Item) {
        instance = new ObjectNotFound(result.ConsumedCapacity);
      } else {
        instance = this._createFromDyItem(result.Item);
        instance._addConsumedCapacity(result.ConsumedCapacity, "read", false);
      }

      // Add to loader context if provided
      if (loaderContext) {
        loaderContext[primaryId] = instance;
      }

      return instance;
    }

    // Batch request logic
    return new Promise((resolve, reject) => {
      const batchKey = `${this.name}-${batchDelay}`;
      const batchRequests = this._getBatchRequests();
      let batchRequest = batchRequests.get(batchKey);

      if (!batchRequest) {
        batchRequest = {
          model: this,
          items: [],
          timer: null,
          timeoutTimer: null,
          delay: batchDelay,
          createdAt: Date.now(),
          loaderContext,
        };
        batchRequests.set(batchKey, batchRequest);

        // Set batch execution timer
        batchRequest.timer = setTimeout(async () => {
          try {
            const currentBatch = batchRequests.get(batchKey);
            if (!currentBatch) return;

            const batchIds = currentBatch.items.map((item) => item.id);

            // Execute bulk find
            const { items, ConsumedCapacity } = await this.batchFind(
              batchIds,
              loaderContext,
            );

            // total callbacks
            const totalCallbacks = currentBatch.items.reduce(
              (sum, item) => sum + item.callbacks.length,
              0,
            );

            // Resolve all promises, including multiple callbacks for the same ID
            const consumedCapacity = {
              TableName: this.table,
              CapacityUnits:
                ConsumedCapacity[0]?.CapacityUnits / totalCallbacks,
            };

            currentBatch.items.forEach((batchItem) => {
              let item = items[batchItem.id];
              if (item) {
                item._addConsumedCapacity(consumedCapacity, "read", false);
              } else {
                item = new ObjectNotFound(consumedCapacity);
              }
              batchItem.callbacks.forEach((cb) => cb.resolve(item));
            });

            // Clean up the batch and BOTH timers
            if (currentBatch.timeoutTimer) {
              clearTimeout(currentBatch.timeoutTimer);
            }
            if (currentBatch.timer) {
              clearTimeout(currentBatch.timer);
            }
            batchRequests.delete(batchKey);
          } catch (error) {
            const currentBatch = batchRequests.get(batchKey);
            if (currentBatch) {
              currentBatch.items.forEach((batchItem) => {
                batchItem.callbacks.forEach((cb) => cb.reject(error));
              });
              if (currentBatch.timeoutTimer) {
                clearTimeout(currentBatch.timeoutTimer);
              }
              if (currentBatch.timer) {
                clearTimeout(currentBatch.timer);
              }
              batchRequests.delete(batchKey);
            }
          }
        }, batchDelay);

        // Set timeout timer
        batchRequest.timeoutTimer = setTimeout(() => {
          const currentBatch = batchRequests.get(batchKey);
          if (currentBatch === batchRequest) {
            if (currentBatch.timer) {
              clearTimeout(currentBatch.timer);
            }
            batchRequests.delete(batchKey);
            currentBatch.items.forEach((batchItem) => {
              batchItem.callbacks.forEach((cb) =>
                cb.reject(new Error("Batch request timed out")),
              );
            });
          }
        }, BATCH_REQUEST_TIMEOUT);
      }

      // Add this request to the batch
      const existingItem = batchRequest.items.find(
        (item) => item.id === primaryId,
      );
      if (existingItem) {
        existingItem.callbacks.push({ resolve, reject });
      } else {
        batchRequest.items.push({
          id: primaryId,
          callbacks: [{ resolve, reject }],
        });
      }
    });
  },
};

module.exports = {
  BatchLoadingMethods,
  BATCH_REQUESTS,
  DEFAULT_BATCH_DELAY_MS,
  BATCH_REQUEST_TIMEOUT,
};