Source: index.js

let rethinkdbdash = require('rethinkdbdash')
let JSData = require('js-data')
let { DSUtils } = JSData
let { upperCase, contains, forOwn, isEmpty, keys, deepMixIn, forEach, isObject, isString, removeCircular, omit } = DSUtils

let underscore = require('mout/string/underscore')

const reserved = [
  'orderBy',
  'sort',
  'limit',
  'offset',
  'skip',
  'where'
]

const addHiddenPropsToTarget = function (target, props) {
  DSUtils.forOwn(props, function (value, key) {
    props[key] = {
      writable: true,
      value
    }
  })
  Object.defineProperties(target, props)
}

const fillIn = function (dest, src) {
  DSUtils.forOwn(src, function (value, key) {
    if (!dest.hasOwnProperty(key) || dest[key] === undefined) {
      dest[key] = value
    }
  })
}

const unique = function (array) {
  const seen = {}
  const final = []
  array.forEach(function (item) {
    if (item in seen) {
      return
    }
    final.push(item)
    seen[item] = 0
  })
  return final
}

class Defaults {

}

addHiddenPropsToTarget(Defaults.prototype, {
  host: 'localhost',
  port: 28015,
  authKey: '',
  db: 'test',
  min: 10,
  max: 50,
  bufferSize: 10
})

/**
 * RethinkDBAdapter class.
 *
 * @example
 * import {DS} from 'js-data'
 * import RethinkDBAdapter from 'js-data-rethinkdb'
 * const store = new DS()
 * const adapter = new RethinkDBAdapter()
 * store.registerAdapter('rethinkdb', adapter, { 'default': true })
 *
 * @class RethinkDBAdapter
 * @param {Object} [opts] Configuration opts.
 * @param {string} [opts.host='localhost'] TODO
 * @param {number} [opts.port=28015] TODO
 * @param {string} [opts.authKey=''] TODO
 * @param {string} [opts.db='test'] TODO
 * @param {number} [opts.min=10] TODO
 * @param {number} [opts.max=50] TODO
 * @param {number} [opts.bufferSize=10] TODO
 */
export default function RethinkDBAdapter (opts) {
  const self = this

  self.defaults = new Defaults()
  deepMixIn(self.defaults, opts)
  fillIn(self, opts)
  self.r = rethinkdbdash(self.defaults)
  self.databases = {}
  self.tables = {}
  self.indices = {}
}

addHiddenPropsToTarget(RethinkDBAdapter.prototype, {
  _handleErrors (cursor) {
    if (cursor && cursor.errors > 0) {
      if (cursor.first_error) {
        throw new Error(cursor.first_error)
      }
      throw new Error('Unknown RethinkDB Error')
    }
  },

  selectTable (Resource, opts) {
    return this.r.db(opts.db || this.defaults.db).table(Resource.table || underscore(Resource.name))
  },

  filterSequence (sequence, params) {
    let r = this.r
    params = params || {}
    params.where = params.where || {}
    params.orderBy = params.orderBy || params.sort
    params.skip = params.skip || params.offset

    forEach(keys(params), function (k) {
      let v = params[k]
      if (!contains(reserved, k)) {
        if (isObject(v)) {
          params.where[k] = v
        } else {
          params.where[k] = {
            '==': v
          }
        }
        delete params[k]
      }
    })

    let query = sequence

    if (!isEmpty(params.where)) {
      query = query.filter((row) => {
        let subQuery
        forOwn(params.where, function (criteria, field) {
          if (!isObject(criteria)) {
            criteria = {'==': criteria}
          }
          forOwn(criteria, function (v, op) {
            if (op === '==' || op === '===') {
              subQuery = subQuery ? subQuery.and(row(field).default(null).eq(v)) : row(field).default(null).eq(v)
            } else if (op === '!=' || op === '!==') {
              subQuery = subQuery ? subQuery.and(row(field).default(null).ne(v)) : row(field).default(null).ne(v)
            } else if (op === '>') {
              subQuery = subQuery ? subQuery.and(row(field).default(null).gt(v)) : row(field).default(null).gt(v)
            } else if (op === '>=') {
              subQuery = subQuery ? subQuery.and(row(field).default(null).ge(v)) : row(field).default(null).ge(v)
            } else if (op === '<') {
              subQuery = subQuery ? subQuery.and(row(field).default(null).lt(v)) : row(field).default(null).lt(v)
            } else if (op === '<=') {
              subQuery = subQuery ? subQuery.and(row(field).default(null).le(v)) : row(field).default(null).le(v)
            } else if (op === 'isectEmpty') {
              subQuery = subQuery ? subQuery.and(row(field).default([]).setIntersection(r.expr(v).default([])).count().eq(0)) : row(field).default([]).setIntersection(r.expr(v).default([])).count().eq(0)
            } else if (op === 'isectNotEmpty') {
              subQuery = subQuery ? subQuery.and(row(field).default([]).setIntersection(r.expr(v).default([])).count().ne(0)) : row(field).default([]).setIntersection(r.expr(v).default([])).count().ne(0)
            } else if (op === 'in') {
              subQuery = subQuery ? subQuery.and(r.expr(v).default(r.expr([])).contains(row(field).default(null))) : r.expr(v).default(r.expr([])).contains(row(field).default(null))
            } else if (op === 'notIn') {
              subQuery = subQuery ? subQuery.and(r.expr(v).default(r.expr([])).contains(row(field).default(null)).not()) : r.expr(v).default(r.expr([])).contains(row(field).default(null)).not()
            } else if (op === '|==' || op === '|===') {
              subQuery = subQuery ? subQuery.or(row(field).default(null).eq(v)) : row(field).default(null).eq(v)
            } else if (op === '|!=' || op === '|!==') {
              subQuery = subQuery ? subQuery.or(row(field).default(null).ne(v)) : row(field).default(null).ne(v)
            } else if (op === '|>') {
              subQuery = subQuery ? subQuery.or(row(field).default(null).gt(v)) : row(field).default(null).gt(v)
            } else if (op === '|>=') {
              subQuery = subQuery ? subQuery.or(row(field).default(null).ge(v)) : row(field).default(null).ge(v)
            } else if (op === '|<') {
              subQuery = subQuery ? subQuery.or(row(field).default(null).lt(v)) : row(field).default(null).lt(v)
            } else if (op === '|<=') {
              subQuery = subQuery ? subQuery.or(row(field).default(null).le(v)) : row(field).default(null).le(v)
            } else if (op === '|isectEmpty') {
              subQuery = subQuery ? subQuery.or(row(field).default([]).setIntersection(r.expr(v).default([])).count().eq(0)) : row(field).default([]).setIntersection(r.expr(v).default([])).count().eq(0)
            } else if (op === '|isectNotEmpty') {
              subQuery = subQuery ? subQuery.or(row(field).default([]).setIntersection(r.expr(v).default([])).count().ne(0)) : row(field).default([]).setIntersection(r.expr(v).default([])).count().ne(0)
            } else if (op === '|in') {
              subQuery = subQuery ? subQuery.or(r.expr(v).default(r.expr([])).contains(row(field).default(null))) : r.expr(v).default(r.expr([])).contains(row(field).default(null))
            } else if (op === '|notIn') {
              subQuery = subQuery ? subQuery.or(r.expr(v).default(r.expr([])).contains(row(field).default(null)).not()) : r.expr(v).default(r.expr([])).contains(row(field).default(null)).not()
            }
          })
        })
        return subQuery
      })
    }

    if (params.orderBy) {
      if (isString(params.orderBy)) {
        params.orderBy = [
          [params.orderBy, 'asc']
        ]
      }
      for (var i = 0; i < params.orderBy.length; i++) {
        if (isString(params.orderBy[i])) {
          params.orderBy[i] = [params.orderBy[i], 'asc']
        }
        query = upperCase(params.orderBy[i][1]) === 'DESC' ? query.orderBy(r.desc(params.orderBy[i][0])) : query.orderBy(params.orderBy[i][0])
      }
    }

    if (params.skip) {
      query = query.skip(+params.skip)
    }

    if (params.limit) {
      query = query.limit(+params.limit)
    }

    return query
  },

  waitForDb (opts) {
    const self = this
    opts = opts || {}
    let db = opts.db || self.defaults.db
    if (!self.databases[db]) {
      self.databases[db] = self.r.branch(
        self.r.dbList().contains(db),
        true,
        self.r.dbCreate(db)
      ).run()
    }
    return self.databases[db]
  },

  /**
   * Create a new record.
   *
   * @name RethinkDBAdapter#create
   * @method
   * @param {Object} Resource The Resource.
   * @param {Object} props The record to be created.
   * @param {Object} [opts] Configuration options.
   * @return {Promise}
   */
  create (Resource, props, opts) {
    const self = this
    props = removeCircular(omit(props, Resource.relationFields || []))
    opts || (opts = {})

    return self.waitForTable(Resource.table || underscore(Resource.name), opts).then(function () {
      return self.selectTable(Resource, opts).insert(props, {returnChanges: true}).run()
    }).then(function (cursor) {
      self._handleErrors(cursor)
      return cursor.changes[0].new_val
    })
  },

  /**
   * Destroy the record with the given primary key.
   *
   * @name RethinkDBAdapter#destroy
   * @method
   * @param {Object} Resource The Resource.
   * @param {(string|number)} id Primary key of the record to destroy.
   * @param {Object} [opts] Configuration options.
   * @return {Promise}
   */
  destroy (Resource, id, opts) {
    const self = this
    opts || (opts = {})

    return self.waitForTable(Resource.table || underscore(Resource.name), opts).then(function () {
      return self.selectTable(Resource, opts).get(id).delete().run()
    }).then(function () {
      return undefined
    })
  },

  /**
   * Destroy the records that match the selection query.
   *
   * @name RethinkDBAdapter#destroyAll
   * @method
   * @param {Object} Resource the Resource.
   * @param {Object} [query] Selection query.
   * @param {Object} [opts] Configuration options.
   * @return {Promise}
   */
  destroyAll (Resource, query, opts) {
    const self = this
    query || (query = {})
    opts || (opts = {})

    return self.waitForTable(Resource.table || underscore(Resource.name), opts).then(function () {
      return self.filterSequence(self.selectTable(Resource, opts), query).delete().run()
    }).then(function () {
      return undefined
    })
  },

  /**
   * TODO
   *
   * There may be reasons why you may want to override this method, like when
   * the id of the parent doesn't exactly match up to the key on the child.
   *
   * @name RethinkDBAdapter#makeHasManyForeignKey
   * @method
   * @return {*}
   */
  makeHasManyForeignKey (Resource, def, record) {
    return DSUtils.get(record, Resource.idAttribute)
  },

  /**
   * TODO
   *
   * @name RethinkDBAdapter#loadHasMany
   * @method
   * @return {Promise}
   */
  loadHasMany (Resource, def, records, __options) {
    const self = this
    let singular = false

    if (DSUtils.isObject(records) && !DSUtils.isArray(records)) {
      singular = true
      records = [records]
    }
    const IDs = records.map(function (record) {
      return self.makeHasManyForeignKey(Resource, def, record)
    })
    const query = {}
    const criteria = query[def.foreignKey] = {}
    if (singular) {
      // more efficient query when we only have one record
      criteria['=='] = IDs[0]
    } else {
      criteria['in'] = IDs.filter(function (id) {
        return id
      })
    }
    return self.findAll(Resource.getResource(def.relation), query, __options).then(function (relatedItems) {
      records.forEach(function (record) {
        let attached = []
        // avoid unneccesary iteration when we only have one record
        if (singular) {
          attached = relatedItems
        } else {
          relatedItems.forEach(function (relatedItem) {
            if (DSUtils.get(relatedItem, def.foreignKey) === record[Resource.idAttribute]) {
              attached.push(relatedItem)
            }
          })
        }
        DSUtils.set(record, def.localField, attached)
      })
    })
  },

  /**
   * TODO
   *
   * @name RethinkDBAdapter#loadHasOne
   * @method
   * @return {Promise}
   */
  loadHasOne (Resource, def, records, __options) {
    if (DSUtils.isObject(records) && !DSUtils.isArray(records)) {
      records = [records]
    }
    return this.loadHasMany(Resource, def, records, __options).then(function () {
      records.forEach(function (record) {
        const relatedData = DSUtils.get(record, def.localField)
        if (DSUtils.isArray(relatedData) && relatedData.length) {
          DSUtils.set(record, def.localField, relatedData[0])
        }
      })
    })
  },

  /**
   * TODO
   *
   * @name RethinkDBAdapter#makeBelongsToForeignKey
   * @method
   * @return {*}
   */
  makeBelongsToForeignKey (Resource, def, record) {
    return DSUtils.get(record, def.localKey)
  },

  /**
   * TODO
   *
   * @name RethinkDBAdapter#loadBelongsTo
   * @method
   * @return {Promise}
   */
  loadBelongsTo (Resource, def, records, __options) {
    const self = this
    const relationDef = Resource.getResource(def.relation)

    if (DSUtils.isObject(records) && !DSUtils.isArray(records)) {
      const record = records
      return self.find(relationDef, self.makeBelongsToForeignKey(Resource, def, record), __options).then(function (relatedItem) {
        DSUtils.set(record, def.localField, relatedItem)
      })
    } else {
      const keys = records.map(function (record) {
        return self.makeBelongsToForeignKey(Resource, def, record)
      }).filter(function (key) {
        return key
      })
      return self.findAll(relationDef, {
        where: {
          [relationDef.idAttribute]: {
            'in': keys
          }
        }
      }, __options).then(function (relatedItems) {
        records.forEach(function (record) {
          relatedItems.forEach(function (relatedItem) {
            if (relatedItem[relationDef.idAttribute] === record[def.localKey]) {
              DSUtils.set(record, def.localField, relatedItem)
            }
          })
        })
      })
    }
  },

  /**
   * Retrieve the record with the given primary key.
   *
   * @name RethinkDBAdapter#find
   * @method
   * @param {Object} Resource The Resource.
   * @param {(string|number)} id Primary key of the record to retrieve.
   * @param {Object} [opts] Configuration options.
   * @param {string[]} [opts.with=[]] TODO
   * @return {Promise}
   */
  find (Resource, id, opts) {
    const self = this
    opts || (opts = {})
    opts.with || (opts.with = [])

    let instance
    const table = Resource.table || underscore(Resource.name)
    const relationList = Resource.relationList || []
    let tasks = [self.waitForTable(table, opts)]

    relationList.forEach(function (def) {
      const relationName = def.relation
      const relationDef = Resource.getResource(relationName)
      if (!relationDef) {
        throw new JSData.DSErrors.NER(relationName)
      } else if (!opts.with || !contains(opts.with, relationName)) {
        return
      }
      if (def.foreignKey) {
        tasks.push(self.waitForIndex(relationDef.table || underscore(relationDef.name), def.foreignKey, opts))
      } else if (def.localKey) {
        tasks.push(self.waitForIndex(Resource.table || underscore(Resource.name), def.localKey, opts))
      }
    })
    return DSUtils.Promise.all(tasks).then(function () {
      return self.selectTable(Resource, opts).get(id).run()
    }).then(function (_instance) {
      if (!_instance) {
        throw new Error('Not Found!')
      }
      instance = _instance
      let tasks = []

      relationList.forEach(function (def) {
        let relationName = def.relation
        let relationDef = Resource.getResource(relationName)
        let containedName = null
        if (opts.with.indexOf(relationName) !== -1) {
          containedName = relationName
        } else if (opts.with.indexOf(def.localField) !== -1) {
          containedName = def.localField
        }
        if (containedName) {
          let __options = DSUtils.deepMixIn({}, opts.orig ? opts.orig() : opts)
          __options.with = opts.with.slice()
          __options = DSUtils._(relationDef, __options)
          DSUtils.remove(__options.with, containedName)
          __options.with.forEach(function (relation, i) {
            if (relation && relation.indexOf(containedName) === 0 && relation.length >= containedName.length && relation[containedName.length] === '.') {
              __options.with[i] = relation.substr(containedName.length + 1)
            } else {
              __options.with[i] = ''
            }
          })

          let task

          if (def.foreignKey && (def.type === 'hasOne' || def.type === 'hasMany')) {
            if (def.type === 'hasOne') {
              task = self.loadHasOne(Resource, def, instance, __options)
            } else {
              task = self.loadHasMany(Resource, def, instance, __options)
            }
          } else if (def.type === 'hasMany' && def.localKeys) {
            let localKeys = []
            let itemKeys = instance[def.localKeys] || []
            itemKeys = DSUtils.isArray(itemKeys) ? itemKeys : DSUtils.keys(itemKeys)
            localKeys = localKeys.concat(itemKeys || [])
            task = self.findAll(Resource.getResource(relationName), {
              where: {
                [relationDef.idAttribute]: {
                  'in': unique(localKeys).filter((x) => x)
                }
              }
            }, __options).then(function (relatedItems) {
              DSUtils.set(instance, def.localField, relatedItems)
              return relatedItems
            })
          } else if (def.type === 'belongsTo' || (def.type === 'hasOne' && def.localKey)) {
            task = self.loadBelongsTo(Resource, def, instance, __options)
          }

          if (task) {
            tasks.push(task)
          }
        }
      })

      return DSUtils.Promise.all(tasks)
    }).then(function () {
      return instance
    })
  },

  /**
   * Retrieve the records that match the selection query.
   *
   * @name RethinkDBAdapter#findAll
   * @method
   * @param {Object} Resource The Resource.
   * @param {Object} query Selection query.
   * @param {Object} [opts] Configuration options.
   * @param {string[]} [opts.with=[]] TODO
   * @return {Promise}
   */
  findAll (Resource, query, opts) {
    const self = this
    opts || (opts = {})
    opts.with || (opts.with = [])

    let items = null
    const table = Resource.table || underscore(Resource.name)
    const relationList = Resource.relationList || []
    let tasks = [self.waitForTable(table, opts)]

    relationList.forEach(function (def) {
      const relationName = def.relation
      const relationDef = Resource.getResource(relationName)
      if (!relationDef) {
        throw new JSData.DSErrors.NER(relationName)
      } else if (!opts.with || !contains(opts.with, relationName)) {
        return
      }
      if (def.foreignKey) {
        tasks.push(self.waitForIndex(relationDef.table || underscore(relationDef.name), def.foreignKey, opts))
      } else if (def.localKey) {
        tasks.push(self.waitForIndex(Resource.table || underscore(Resource.name), def.localKey, opts))
      }
    })
    return DSUtils.Promise.all(tasks).then(function () {
      return self.filterSequence(self.selectTable(Resource, opts), query).run()
    }).then(function (_items) {
      items = _items
      let tasks = []
      const relationList = Resource.relationList || []
      relationList.forEach(function (def) {
        let relationName = def.relation
        let relationDef = Resource.getResource(relationName)
        let containedName = null
        if (opts.with.indexOf(relationName) !== -1) {
          containedName = relationName
        } else if (opts.with.indexOf(def.localField) !== -1) {
          containedName = def.localField
        }
        if (containedName) {
          let __options = DSUtils.deepMixIn({}, opts.orig ? opts.orig() : opts)
          __options.with = opts.with.slice()
          __options = DSUtils._(relationDef, __options)
          DSUtils.remove(__options.with, containedName)
          __options.with.forEach(function (relation, i) {
            if (relation && relation.indexOf(containedName) === 0 && relation.length >= containedName.length && relation[containedName.length] === '.') {
              __options.with[i] = relation.substr(containedName.length + 1)
            } else {
              __options.with[i] = ''
            }
          })

          let task

          if (def.foreignKey && (def.type === 'hasOne' || def.type === 'hasMany')) {
            if (def.type === 'hasMany') {
              task = self.loadHasMany(Resource, def, items, __options)
            } else {
              task = self.loadHasOne(Resource, def, items, __options)
            }
          } else if (def.type === 'hasMany' && def.localKeys) {
            let localKeys = []
            items.forEach(function (item) {
              let itemKeys = item[def.localKeys] || []
              itemKeys = DSUtils.isArray(itemKeys) ? itemKeys : Object.keys(itemKeys)
              localKeys = localKeys.concat(itemKeys || [])
            })
            task = self.findAll(Resource.getResource(relationName), {
              where: {
                [relationDef.idAttribute]: {
                  'in': unique(localKeys).filter((x) => x)
                }
              }
            }, __options).then(function (relatedItems) {
              items.forEach(function (item) {
                let attached = []
                let itemKeys = item[def.localKeys] || []
                itemKeys = DSUtils.isArray(itemKeys) ? itemKeys : DSUtils.keys(itemKeys)
                relatedItems.forEach(function (relatedItem) {
                  if (itemKeys && itemKeys.indexOf(relatedItem[relationDef.idAttribute]) !== -1) {
                    attached.push(relatedItem)
                  }
                })
                DSUtils.set(item, def.localField, attached)
              })
              return relatedItems
            })
          } else if (def.type === 'belongsTo' || (def.type === 'hasOne' && def.localKey)) {
            task = self.loadBelongsTo(Resource, def, items, __options)
          }

          if (task) {
            tasks.push(task)
          }
        }
      })
      return DSUtils.Promise.all(tasks)
    }).then(function () {
      return items
    })
  },

  /**
   * Apply the given update to the record with the specified primary key.
   *
   * @name RethinkDBAdapter#update
   * @method
   * @param {Object} Resource The Resource.
   * @param {(string|number)} id The primary key of the record to be updated.
   * @param {Object} props The update to apply to the record.
   * @param {Object} [opts] Configuration options.
   * @return {Promise}
   */
  update (resourceConfig, id, attrs, options) {
    attrs = removeCircular(omit(attrs, resourceConfig.relationFields || []))
    options = options || {}
    return this.waitForTable(resourceConfig.table || underscore(resourceConfig.name), options).then(() => {
      return this.r.db(options.db || this.defaults.db).table(resourceConfig.table || underscore(resourceConfig.name)).get(id).update(attrs, {returnChanges: true}).run()
    }).then((cursor) => {
      this._handleErrors(cursor)
      if (cursor.changes && cursor.changes.length && cursor.changes[0].new_val) {
        return cursor.changes[0].new_val
      } else {
        return this.selectTable(resourceConfig, options).get(id).run()
      }
    })
  },

  /**
   * Apply the given update to all records that match the selection query.
   *
   * @name RethinkDBAdapter#updateAll
   * @method
   * @param {Object} Resource The Resource.
   * @param {Object} props The update to apply to the selected records.
   * @param {Object} [query] Selection query.
   * @param {Object} [opts] Configuration options.
   * @return {Promise}
   */
  updateAll (resourceConfig, attrs, params, options) {
    attrs = removeCircular(omit(attrs, resourceConfig.relationFields || []))
    options = options || {}
    params = params || {}
    return this.waitForTable(resourceConfig.table || underscore(resourceConfig.name), options).then(() => {
      return this.filterSequence(this.selectTable(resourceConfig, options), params).update(attrs, {returnChanges: true}).run()
    }).then((cursor) => {
      this._handleErrors(cursor)
      if (cursor && cursor.changes && cursor.changes.length) {
        let items = []
        cursor.changes.forEach((change) => items.push(change.new_val))
        return items
      } else {
        return this.filterSequence(this.selectTable(resourceConfig, options), params).run()
      }
    })
  },

  waitForTable (table, options) {
    options = options || {}
    let db = options.db || this.defaults.db
    return this.waitForDb(options).then(() => {
      this.tables[db] = this.tables[db] || {}
      if (!this.tables[db][table]) {
        this.tables[db][table] = this.r.branch(this.r.db(db).tableList().contains(table), true, this.r.db(db).tableCreate(table)).run()
      }
      return this.tables[db][table]
    })
  },

  waitForIndex (table, index, options) {
    options = options || {}
    let db = options.db || this.defaults.db
    return this.waitForDb(options).then(() => this.waitForTable(table, options)).then(() => {
      this.indices[db] = this.indices[db] || {}
      this.indices[db][table] = this.indices[db][table] || {}
      if (!this.tables[db][table][index]) {
        this.tables[db][table][index] = this.r.branch(this.r.db(db).table(table).indexList().contains(index), true, this.r.db(db).table(table).indexCreate(index)).run().then(() => {
          return this.r.db(db).table(table).indexWait(index).run()
        })
      }
      return this.tables[db][table][index]
    })
  }
})