mixins_mutation-mixin.js

const {
  TransactWriteCommand,
  UpdateCommand,
} = require("@aws-sdk/lib-dynamodb");
const { defaultLogger: logger } = require("../utils/logger");
const { pluginManager } = require("../plugin-manager");
const { retryOperation } = require("../utils/retry-helper");
const assert = require("assert");

const MutationMethods = {
  /**
   *@memberof BaoModel
   * @description
   * Create a new item in the database.
   * @param {Object} jsUpdates - The data to create the item with.
   * @returns {Promise<Object>} Returns a promise that resolves to the created item.
   */
  async create(jsUpdates) {
    await pluginManager.executeHooks(this.name, "beforeSave", jsUpdates, {
      isNew: true,
    });
    const result = await this._saveItem(null, jsUpdates, { isNew: true });

    logger.debug("create() - result", result);
    await pluginManager.executeHooks(this.name, "afterSave", result, {
      isNew: true,
    });
    return result;
  },

  /**
   *@memberof BaoModel
   * @description
   * Update an existing item in the database.
   * @param {string} primaryId - The primary ID of the item to update.
   * @param {Object} jsUpdates - The data to update the item with.
   * @param {Object} [options] - Additional options for the update operation.
   * @returns {Promise<Object>} Returns a promise that resolves to the updated item.
   */
  async update(primaryId, jsUpdates, options = {}) {
    const updateOptions = {
      isNew: false,
      ...options,
    };
    await pluginManager.executeHooks(
      this.name,
      "beforeSave",
      jsUpdates,
      updateOptions,
    );

    const result = await this._saveItem(primaryId, jsUpdates, updateOptions);

    logger.debug("update() - result", result);

    await pluginManager.executeHooks(
      this.name,
      "afterSave",
      result,
      updateOptions,
    );

    return result;
  },

  /**
   *@memberof BaoModel
   * @description
   * Delete an existing item in the database.
   * @param {string} primaryId - The primary ID of the item to delete.
   * @param {Object} [options] - Additional options for the delete operation.
   * @returns {Promise<Object>} Returns a promise that resolves to the deleted item.
   */
  async delete(primaryId, options = {}) {
    await pluginManager.executeHooks(
      this.name,
      "beforeDelete",
      primaryId,
      options,
    );

    const item = await this.find(primaryId, { batchDelay: 0 });
    if (!item) {
      throw new Error("Item not found");
    }

    const transactItems = [
      {
        Delete: {
          TableName: this.table,
          Key: {
            _pk: item._dyData._pk,
            _sk: item._dyData._sk,
          },
        },
      },
    ];

    // Add unique constraint cleanup
    const uniqueConstraints = Object.values(this.uniqueConstraints || {});
    if (uniqueConstraints.length > 0) {
      for (const constraint of uniqueConstraints) {
        const value = item[constraint.field];
        if (value) {
          const constraintOp = await this._removeUniqueConstraint(
            constraint.field,
            value,
            item.getPrimaryId(),
            constraint.constraintId,
          );
          transactItems.push(constraintOp);
        }
      }
    }

    const response = await retryOperation(() =>
      this.documentClient.send(
        new TransactWriteCommand({
          TransactItems: transactItems,
          ReturnConsumedCapacity: "TOTAL",
        }),
      ),
    );

    await pluginManager.executeHooks(
      this.name,
      "afterDelete",
      primaryId,
      options,
    );

    // Return deleted item info with capacity information
    item._setConsumedCapacity(response.ConsumedCapacity, "write", false);
    return item;
  },

  _createNewPrimaryId(jsUpdates) {
    // Now calculate primary key values using the processed data
    const pkValue = this._getPkValue(jsUpdates);
    const skValue = this._getSkValue(jsUpdates);

    // Format the primary key
    const pk = this._formatPrimaryKey(this.modelPrefix, pkValue);
    const primaryId = this.getPrimaryId({
      ...jsUpdates,
      _pk: pk,
      _sk: skValue,
    });

    return primaryId;
  },

  /**
   *@memberof BaoModel
   * @private
   * @description
   * Save an item to the database.
   * @param {string} primaryId - The primary ID of the item to save.
   * @param {Object} jsUpdates - The data to save the item with.
   * @param {Object} [options] - Additional options for the save operation.
   * @param {boolean} [options.isNew=false] - Whether this is a new item being created. Internal use only.
   * @param {Object} [options.instanceObj=null] - Existing model instance, if any. Internal use only.
   * @param {Object} [options.constraints={}] - Constraints to validate. Options are:
   * @param {boolean} [options.constraints.mustExist=false] - Whether the item must exist.
   * @param {boolean} [options.constraints.mustNotExist=false] - Whether the item must not exist.
   * @param {string[]} [options.constraints.fieldMatches=[]] - An array of field names that must match
   * the current item's loaded state. This is often used for optimistic locking in conjunction
   * with a {@link BaoFields.VersionField} field.
   * @returns {Promise<Object>} Returns a promise that resolves to the saved item.
   */
  async _saveItem(primaryId, jsUpdates, options = {}) {
    try {
      const { isNew = false, instanceObj = null, constraints = {} } = options;

      logger.debug("saveItem", primaryId, isNew, instanceObj);
      let consumedCapacity = [];

      let currentItem = instanceObj;
      if (isNew) {
        currentItem = null;
      } else if (!currentItem) {
        currentItem = await this.find(primaryId, { batchDelay: 0 });
        consumedCapacity = [
          ...consumedCapacity,
          ...currentItem.getConsumedCapacity(),
        ];
      }

      if (!isNew && !currentItem) {
        throw new Error("Item not found");
      }

      const transactItems = [];
      const dyUpdatesToSave = {};
      let hasUniqueConstraintChanges = false;

      logger.debug("jsUpdates", jsUpdates);

      // Generate Dynamo Updates to save
      for (const [key, field] of Object.entries(this.fields)) {
        if (isNew && jsUpdates[key] === undefined) {
          // Only set initial values during creation
          const initialValue = field.getInitialValue();
          if (initialValue !== undefined) {
            jsUpdates[key] = initialValue;
          }
        }

        if (jsUpdates[key] !== undefined) {
          field.validate(jsUpdates[key]);
          dyUpdatesToSave[key] = field.toDy(jsUpdates[key]);
        } else {
          if (typeof field.updateBeforeSave === "function") {
            const newValue = field.updateBeforeSave(jsUpdates[key]);
            if (newValue !== jsUpdates[key]) {
              dyUpdatesToSave[key] = field.toDy(newValue);
            }
          }
        }
      }

      if (isNew) {
        primaryId = this._createNewPrimaryId(jsUpdates);

        // validate required fields
        for (const [fieldName, field] of Object.entries(this.fields)) {
          if (
            field.required &&
            (jsUpdates[fieldName] === undefined ||
              jsUpdates[fieldName] === null)
          ) {
            throw new Error(`Field is required: ${fieldName} `);
          }
        }
      }

      Object.entries(jsUpdates).forEach(([fieldName, value]) => {
        const field = this._getField(fieldName);
        if (field) {
          field.validate(value, fieldName);
        }
      });

      // Validate unique constraints before attempting save
      await this._validateUniqueConstraints(
        jsUpdates,
        isNew ? null : primaryId,
      );

      // Handle unique constraints
      for (const constraint of Object.values(this.uniqueConstraints || {})) {
        const fieldName = constraint.field;
        const field = this._getField(fieldName);
        const dyNewValue = dyUpdatesToSave[fieldName];
        const dyCurrentValue = currentItem?._loadedDyData[fieldName];

        logger.debug("uniqueConstraint", field, dyCurrentValue, dyNewValue);

        if (dyNewValue !== undefined && dyNewValue !== dyCurrentValue) {
          hasUniqueConstraintChanges = true;

          // Remove old constraint if updating
          if (currentItem && dyCurrentValue) {
            transactItems.push(
              await this._removeUniqueConstraint(
                fieldName,
                dyCurrentValue,
                primaryId,
                constraint.constraintId,
              ),
            );
          }

          // Add new constraint
          transactItems.push(
            await this._createUniqueConstraint(
              fieldName,
              dyNewValue,
              primaryId,
              constraint.constraintId,
            ),
          );
        }
      }

      // Add testId if we're in test mode
      const testId = this.manager.getTestId();
      if (testId) {
        logger.debug("savedTestId", testId, currentItem);
        if (testId !== currentItem?._loadedDyData._gsi_test_id) {
          dyUpdatesToSave._gsi_test_id = testId;
        }
      }

      // Add GSI keys
      const indexKeys = this._getIndexKeys(dyUpdatesToSave);
      logger.debug("indexKeys", indexKeys);
      Object.assign(dyUpdatesToSave, indexKeys);

      // Build the condition expression for the update/put
      const conditionExpressions = [];
      const conditionNames = {};
      const conditionValues = {};

      // Handle existence constraints
      if (constraints.mustExist) {
        conditionExpressions.push("attribute_exists(#pk)");
        conditionNames["#pk"] = "_pk";
      }
      if (constraints.mustNotExist) {
        conditionExpressions.push("attribute_not_exists(#pk)");
        conditionNames["#pk"] = "_pk";
      }

      logger.debug("field matchconstraints", constraints);

      // Handle field match constraints
      if (constraints.fieldMatches) {
        if (instanceObj === null) {
          throw new Error("Instance object is required to check field matches");
        }

        const matchFields = Array.isArray(constraints.fieldMatches)
          ? constraints.fieldMatches
          : [constraints.fieldMatches];

        matchFields.forEach((fieldName, index) => {
          if (!this._getField(fieldName)) {
            throw new Error(
              `Unknown field in fieldMatches constraint: ${fieldName}`,
            );
          }

          const nameKey = `#match${index}`;
          const valueKey = `:match${index}`;

          conditionExpressions.push(`${nameKey} = ${valueKey}`);
          conditionNames[nameKey] = fieldName;
          conditionValues[valueKey] = currentItem
            ? currentItem._loadedDyData[fieldName]
            : undefined;
        });
      }

      // When building update expression, pass both old and new data
      const { updateExpression, names, values } =
        this._buildUpdateExpression(dyUpdatesToSave);

      const dyKey = this._getDyKeyForPkSk(this._parsePrimaryId(primaryId));
      logger.debug("dyKey", dyKey);

      // Create the update params
      const updateParams = {
        TableName: this.table,
        Key: dyKey,
        UpdateExpression: updateExpression,
        ExpressionAttributeNames: {
          ...names,
          ...conditionNames,
        },
        ReturnValues: "ALL_NEW",
      };

      if (Object.keys(values).length > 0) {
        updateParams.ExpressionAttributeValues = {
          ...values,
          ...conditionValues,
        };
      }

      // Add condition expression if we have any conditions
      if (conditionExpressions.length > 0) {
        updateParams.ConditionExpression = conditionExpressions.join(" AND ");
      } else if (isNew) {
        // Default condition for new items if no other conditions specified
        updateParams.ConditionExpression = "attribute_not_exists(#pk)";
        updateParams.ExpressionAttributeNames["#pk"] = "_pk";
      }

      try {
        let response;

        if (hasUniqueConstraintChanges) {
          // Use transaction if we have unique constraint changes
          transactItems.push({
            Update: updateParams,
          });

          logger.debug("transactItems", JSON.stringify(transactItems, null, 2));

          response = await retryOperation(() =>
            this.documentClient.send(
              new TransactWriteCommand({
                TransactItems: transactItems,
                ReturnConsumedCapacity: "TOTAL",
              }),
            ),
          );

          logger.debug("transactItems response", response);
          logger.debug("primaryId to load", primaryId);

          // Fetch the item since transactWrite doesn't return values
          const savedItem = await this.find(primaryId, { batchDelay: 0 });

          logger.debug("savedItem", savedItem);
          logger.debug("savedItem.getPrimaryId()", savedItem.getPrimaryId());

          if (!savedItem.exists()) {
            throw new Error("Failed to fetch saved item");
          }

          // Set the consumed capacity from the transaction
          savedItem._addConsumedCapacity(
            response.ConsumedCapacity,
            "write",
            false,
          );
          savedItem._addConsumedCapacity(consumedCapacity, "read", false);

          return savedItem;
        } else {
          // Use simple update if no unique constraints are changing
          logger.debug("updateParams", JSON.stringify(updateParams, null, 2));

          try {
            response = await retryOperation(() =>
              this.documentClient.send(
                new UpdateCommand({
                  ...updateParams,
                  ReturnConsumedCapacity: "TOTAL",
                }),
              ),
            );
          } catch (error) {
            logger.error(`DynamoDB update failed for ${primaryId}:`, error);
            throw error;
          }

          const savedItem = this._createFromDyItem(response.Attributes);
          logger.debug("savedItem", savedItem);

          savedItem._setConsumedCapacity(
            response.ConsumedCapacity,
            "write",
            false,
          );
          savedItem._addConsumedCapacity(consumedCapacity, "read", false);
          return savedItem;
        }
      } catch (error) {
        logger.error("Error in _saveItem", error);
        if (error.name === "ConditionalCheckFailedException") {
          if (constraints.mustExist) {
            throw new Error("Item must exist");
          }
          if (constraints.mustNotExist) {
            throw new Error("Item must not exist");
          }
          if (constraints.fieldMatches) {
            throw new Error("Field values have been modified");
          }
          throw new Error("Condition check failed", error);
        }

        if (error.name === "TransactionCanceledException") {
          await this._validateUniqueConstraints(
            jsUpdates,
            isNew ? null : primaryId,
          );
        }
        throw error;
      }
    } catch (error) {
      logger.error(`Error in _saveItem for ${primaryId}:`, error);
      throw error;
    }
  },

  _buildUpdateExpression(dyUpdatesToSave) {
    const names = {};
    const values = {};
    const expressions = [];

    logger.debug("dyUpdatesToSave", dyUpdatesToSave);

    // Process all fields in the data
    for (const [fieldName, value] of Object.entries(dyUpdatesToSave)) {
      // Skip undefined values
      if (value === undefined) continue;

      const field = this._getField(fieldName);

      // Handle null values differently - use REMOVE instead of SET
      if (value === null) {
        expressions.push({
          type: "REMOVE",
          expression: `#${fieldName}`,
          attrNameKey: `#${fieldName}`,
          fieldName: fieldName,
          fieldValue: null,
        });
      } else {
        expressions.push(field.getUpdateExpression(fieldName, value));
      }
    }

    const parts = [];
    const setExpressions = [];
    const addExpressions = [];
    const removeExpressions = [];

    expressions.forEach((expression) => {
      if (expression.type === "SET") {
        setExpressions.push(expression.expression);
      } else if (expression.type === "ADD") {
        addExpressions.push(expression.expression);
      } else if (expression.type === "REMOVE") {
        removeExpressions.push(expression.expression);
      }

      names[expression.attrNameKey] = expression.fieldName;

      if (expression.fieldValue !== null) {
        values[expression.attrValueKey] = expression.fieldValue;
      }
    });

    if (setExpressions.length > 0)
      parts.push(`SET ${setExpressions.join(", ")}`);
    if (addExpressions.length > 0)
      parts.push(`ADD ${addExpressions.join(", ")}`);
    if (removeExpressions.length > 0)
      parts.push(`REMOVE ${removeExpressions.join(", ")}`);

    return {
      updateExpression: parts.join(" "),
      names,
      values,
    };
  },

  _getIndexKeys(data) {
    const indexKeys = {};

    Object.entries(this.indexes).forEach(([indexName, index]) => {
      let pkValue, skValue;

      // Handle partition key
      if (index.pk === "modelPrefix") {
        pkValue = this.modelPrefix;
      } else {
        const pkField = this._getField(index.pk);
        pkValue = data[index.pk];
        if (pkValue !== undefined) {
          pkValue = pkField.toGsi(pkValue);
        }
      }

      // Handle sort key
      if (index.sk === "modelPrefix") {
        skValue = this.modelPrefix;
      } else {
        const skField = this._getField(index.sk);
        skValue = data[index.sk];
        if (skValue !== undefined) {
          skValue = skField.toGsi(skValue);
        }
      }

      if (
        pkValue !== undefined &&
        skValue !== undefined &&
        index.indexId !== undefined
      ) {
        logger.debug("indexKeys", {
          pkValue,
          skValue,
          indexId: index.indexId,
        });
        const gsiPk = this._formatGsiKey(
          this.modelPrefix,
          index.indexId,
          pkValue,
        );
        indexKeys[`_${index.indexId}_pk`] = gsiPk;
        indexKeys[`_${index.indexId}_sk`] = skValue;
      }
    });

    return indexKeys;
  },
};

module.exports = MutationMethods;