index.js 5.91 KB
// put javascript in here
'use strict'

const Parser = require('jsonparse')
const Minipass = require('minipass')

class JSONStreamError extends Error {
  constructor (err, caller) {
    super(err.message)
    Error.captureStackTrace(this, caller || this.constructor)
  }
  get name () {
    return 'JSONStreamError'
  }
  set name (n) {}
}

const check = (x, y) =>
  typeof x === 'string' ? String(y) === x
  : x && typeof x.test === 'function' ? x.test(y)
  : typeof x === 'boolean' || typeof x === 'object' ? x
  : typeof x === 'function' ? x(y)
  : false

const _parser = Symbol('_parser')
const _onValue = Symbol('_onValue')
const _onTokenOriginal = Symbol('_onTokenOriginal')
const _onToken = Symbol('_onToken')
const _onError = Symbol('_onError')
const _count = Symbol('_count')
const _path = Symbol('_path')
const _map = Symbol('_map')
const _root = Symbol('_root')
const _header = Symbol('_header')
const _footer = Symbol('_footer')
const _setHeaderFooter = Symbol('_setHeaderFooter')
const _ending = Symbol('_ending')

class JSONStream extends Minipass {
  constructor (opts = {}) {
    super({
      ...opts,
      objectMode: true,
    })

    this[_ending] = false
    const parser = this[_parser] = new Parser()
    parser.onValue = value => this[_onValue](value)
    this[_onTokenOriginal] = parser.onToken
    parser.onToken = (token, value) => this[_onToken](token, value)
    parser.onError = er => this[_onError](er)

    this[_count] = 0
    this[_path] = typeof opts.path === 'string'
      ? opts.path.split('.').map(e =>
          e === '$*' ? { emitKey: true }
          : e === '*' ? true
          : e === '' ? { recurse: true }
          : e)
      : Array.isArray(opts.path) && opts.path.length ? opts.path
      : null

    this[_map] = typeof opts.map === 'function' ? opts.map : null
    this[_root] = null
    this[_header] = null
    this[_footer] = null
    this[_count] = 0
  }

  [_setHeaderFooter] (key, value) {
    // header has not been emitted yet
    if (this[_header] !== false) {
      this[_header] = this[_header] || {}
      this[_header][key] = value
    }

    // footer has not been emitted yet but header has
    if (this[_footer] !== false && this[_header] === false) {
      this[_footer] = this[_footer] || {}
      this[_footer][key] = value
    }
  }

  [_onError] (er) {
    // error will always happen during a write() call.
    const caller = this[_ending] ? this.end : this.write
    this[_ending] = false
    return this.emit('error', new JSONStreamError(er, caller))
  }

  [_onToken] (token, value) {
    const parser = this[_parser]
    this[_onTokenOriginal].call(parser, token, value)
    if (parser.stack.length === 0) {
      if (this[_root]) {
        const root = this[_root]
        if (!this[_path])
          super.write(root)
        this[_root] = null
        this[_count] = 0
      }
    }
  }

  [_onValue] (value) {
    const parser = this[_parser]
    // the LAST onValue encountered is the root object.
    // just overwrite it each time.
    this[_root] = value

    if(!this[_path]) return

    let i = 0 // iterates on path
    let j  = 0 // iterates on stack
    let emitKey = false
    let emitPath = false
    while (i < this[_path].length) {
      const key = this[_path][i]
      j++

      if (key && !key.recurse) {
        const c = (j === parser.stack.length) ? parser : parser.stack[j]
        if (!c) return
        if (!check(key, c.key)) {
          this[_setHeaderFooter](c.key, value)
          return
        }
        emitKey = !!key.emitKey;
        emitPath = !!key.emitPath;
        i++
      } else {
        i++
        if (i >= this[_path].length)
          return
        const nextKey = this[_path][i]
        if (!nextKey)
          return
        while (true) {
          const c = (j === parser.stack.length) ? parser : parser.stack[j]
          if (!c) return
          if (check(nextKey, c.key)) {
            i++
            if (!Object.isFrozen(parser.stack[j]))
              parser.stack[j].value = null
            break
          } else {
            this[_setHeaderFooter](c.key, value)
          }
          j++
        }
      }
    }

    // emit header
    if (this[_header]) {
      const header = this[_header]
      this[_header] = false
      this.emit('header', header)
    }
    if (j !== parser.stack.length) return

    this[_count] ++
    const actualPath = parser.stack.slice(1)
      .map(e => e.key).concat([parser.key])
    if (value !== null && value !== undefined) {
      const data = this[_map] ? this[_map](value, actualPath) : value
      if (data !== null && data !== undefined) {
        const emit = emitKey || emitPath ? { value: data } : data
        if (emitKey)
          emit.key = parser.key
        if (emitPath)
          emit.path = actualPath
        super.write(emit)
      }
    }

    if (parser.value)
      delete parser.value[parser.key]

    for (const k of parser.stack) {
      k.value = null
    }
  }

  write (chunk, encoding, cb) {
    if (typeof encoding === 'function')
      cb = encoding, encoding = null
    if (typeof chunk === 'string')
      chunk = Buffer.from(chunk, encoding)
    else if (!Buffer.isBuffer(chunk))
      return this.emit('error', new TypeError(
        'Can only parse JSON from string or buffer input'))
    this[_parser].write(chunk)
    if (cb)
      cb()
    return this.flowing
  }

  end (chunk, encoding, cb) {
    this[_ending] = true
    if (typeof encoding === 'function')
      cb = encoding, encoding = null
    if (typeof chunk === 'function')
      cb = chunk, chunk = null
    if (chunk)
      this.write(chunk, encoding)
    if (cb)
      this.once('end', cb)

    const h = this[_header]
    this[_header] = null
    const f = this[_footer]
    this[_footer] = null
    if (h)
      this.emit('header', h)
    if (f)
      this.emit('footer', f)
    return super.end()
  }

  static get JSONStreamError () { return JSONStreamError }
  static parse (path, map) {
    return new JSONStream({path, map})
  }
}

module.exports = JSONStream