-
Notifications
You must be signed in to change notification settings - Fork 37
Do not emit 'close' event after finish with query #52
Comments
I’m willing to send a PR to fix this if its |
@mcollina just did. Thank you again. |
The implementation of the brianc/node-pg-query-stream is broken. For example, the stream doc states:
Here is a correct implementation: const { Readable } = require('stream')
const Cursor = require('pg-cursor')
class PgQueryStream extends Readable {
constructor(text, values, { rowMode = undefined, types = undefined, batchSize = 100 } = {}) {
// https://nodejs.org/api/stream.html#stream_new_stream_readable_options
super({ objectMode: true, emitClose: true, autoDestroy: true, highWaterMark: batchSize })
this.cursor = new Cursor(text, values, { rowMode, types })
this._reading = false
this._callbacks = []
// delegate Submittable callbacks to cursor
this.handleRowDescription = this.cursor.handleRowDescription.bind(this.cursor)
this.handleDataRow = this.cursor.handleDataRow.bind(this.cursor)
this.handlePortalSuspended = this.cursor.handlePortalSuspended.bind(this.cursor)
this.handleCommandComplete = this.cursor.handleCommandComplete.bind(this.cursor)
this.handleReadyForQuery = this.cursor.handleReadyForQuery.bind(this.cursor)
this.handleError = this.cursor.handleError.bind(this.cursor)
}
submit(connection) {
this.cursor.submit(connection)
}
close(callback) {
if (this.destroyed) {
if (callback) setImmediate(callback)
} else {
if (callback) this.once('close', callback)
this.destroy()
}
}
_close() {
this.cursor.close(() => {
let cb
while ((cb = this._callbacks.pop())) cb()
})
}
_destroy(_err, callback) {
this._callbacks.push(callback)
if (!this._reading) {
this._close()
}
}
// https://nodejs.org/api/stream.html#stream_readable_read_size_1
_read(size) {
// Prevent _destroy() from closing while reading
this._reading = true
this.cursor.read(size, (err, rows, result) => {
this._reading = false
if (this.destroyed) {
// Destroyed while reading?
this._close()
} else if (err) {
// https://nodejs.org/api/stream.html#stream_errors_while_reading
this.destroy(err)
} else {
for (const row of rows) this.push(row)
if (rows.length < size) this.push(null)
}
})
}
}
module.exports = PgQueryStream |
@matthieusieben i would suggest to use PR instead of you comment. And it would be good to have new tests. |
Also @matthieusieben how it is possible to have multiple destroy callbacks? |
I don't think there should be more than one callback. This implementation is safe and works wether there is zero, one or more callbacks added |
I didn't create a pr because I didn't want to bother writing tests & co. I just shared my implementation. I leave it to someone else to do it. |
If you wish, just create a pr with _callback being a nullable value instead of an array. |
I do not want spend time anymore to this project.
пт, 6 дек. 2019 г., 17:50 Matthieu Sieben <[email protected]>:
… If you wish, just create a pr with _callback being a nullable value
instead of an array.
—
You are receiving this because you authored the thread.
Reply to this email directly, view it on GitHub
<#52?email_source=notifications&email_token=AACRYA6UVZTBFC7R5RJEVY3QXJ7FBA5CNFSM4G3QE47KYY3PNVWWK3TUL52HS4DFVREXG43VMVBW63LNMVXHJKTDN5WW2ZLOORPWSZGOEGEVZ3I#issuecomment-562650349>,
or unsubscribe
<https://github.com/notifications/unsubscribe-auth/AACRYA7OLDRWT26M47IMW6DQXJ7FBANCNFSM4G3QE47A>
.
|
I'm going to port this repo over to the pg monorepo. After that I'll rewrite this module & it will be tested under all supported versions of node using the proper version of |
If you do this, please start from scratch so that we get a proper implementation of the stream interface, without keeping the legacy from this package! |
Will try my best! If you're interested in supporting my continued work please consider sponsoring me! |
We found that one of our apps could not use node11 because of breaking change. I created bug in nodejs repo. And node maintainers said it is bug in this module instead. Stream should not emit 'close' before 'end'
The text was updated successfully, but these errors were encountered: