Skip to content

Commit

Permalink
Merge pull request #1280 from tediousjs/arthur/new-bulkload-api
Browse files Browse the repository at this point in the history
New BulkLoad streaming API
  • Loading branch information
arthurschreiber authored Aug 1, 2021
2 parents 8a4a4e8 + 5178ffa commit 54c141a
Show file tree
Hide file tree
Showing 4 changed files with 767 additions and 32 deletions.
61 changes: 61 additions & 0 deletions benchmarks/bulk-load/iterable.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
// @ts-check

const { createBenchmark, createConnection } = require('../common');

const { Request, TYPES } = require('../../src/tedious');

const bench = createBenchmark(main, {
n: [10, 100],
size: [
10,
100,
1000,
10000
]
});

function main({ n, size }) {
createConnection((connection) => {
const request = new Request(`
CREATE TABLE "#tmpTestTable" (
"id" int NOT NULL
)
`, (err) => {
if (err) {
throw err;
}

let i = 0;

bench.start();

(function cb() {
const bulkLoad = connection.newBulkLoad('#tmpTestTable', (err) => {
if (err) {
throw err;
}

if (i++ === n) {
bench.end(n);

connection.close();

return;
}

cb();
});

bulkLoad.addColumn('id', TYPES.Int, { nullable: false });

const rows = [];
for (let j = 0; j < size; j++) {
rows.push([ j ]);
}
connection.execBulkLoad(bulkLoad, rows);
})();
});

connection.execSqlBatch(request);
});
}
64 changes: 44 additions & 20 deletions src/bulk-load.ts
Original file line number Diff line number Diff line change
Expand Up @@ -222,12 +222,11 @@ class RowTransform extends Transform {
* bulkLoad.addColumn('myInt', TYPES.Int, { nullable: false });
* bulkLoad.addColumn('myString', TYPES.NVarChar, { length: 50, nullable: true });
*
* // add rows
* bulkLoad.addRow({ myInt: 7, myString: 'hello' });
* bulkLoad.addRow({ myInt: 23, myString: 'world' });
*
* // execute
* connection.execBulkLoad(bulkLoad);
* connection.execBulkLoad(bulkLoad, [
* { myInt: 7, myString: 'hello' },
* { myInt: 23, myString: 'world' }
* ]);
* ```
*/
class BulkLoad extends EventEmitter {
Expand Down Expand Up @@ -372,10 +371,10 @@ class BulkLoad extends EventEmitter {
*
* @param name The name of the column.
* @param type One of the supported `data types`.
* @param __namedParameters Type [[ColumnOptions]]<p> Additional column type information. At a minimum, `nullable` must be set to true or false.
* @param __namedParameters Additional column type information. At a minimum, `nullable` must be set to true or false.
* @param length For VarChar, NVarChar, VarBinary. Use length as `Infinity` for VarChar(max), NVarChar(max) and VarBinary(max).
* @param nullable Indicates whether the column accepts NULL values.
* @param objName If the name of the column is different from the name of the property found on `rowObj` arguments passed to [[addRow]], then you can use this option to specify the property name.
* @param objName If the name of the column is different from the name of the property found on `rowObj` arguments passed to [[addRow]] or [[Connection.execBulkLoad]], then you can use this option to specify the property name.
* @param precision For Numeric, Decimal.
* @param scale For Numeric, Decimal, Time, DateTime2, DateTimeOffset.
*/
Expand Down Expand Up @@ -419,29 +418,49 @@ class BulkLoad extends EventEmitter {
}

/**
* Adds a row to the bulk insert. This method accepts arguments in three different formats:
* Adds a row to the bulk insert.
*
* ```js
* bulkLoad.addRow( rowObj )
* bulkLoad.addRow( columnArray )
* bulkLoad.addRow( col0, col1, ... colN )`
* bulkLoad.addRow({ first_name: 'Bill', last_name: 'Gates' });
* ```
* * `rowObj`
*
* An object of key/value pairs representing column name (or objName) and value.
* @param row An object of key/value pairs representing column name (or objName) and value.
*
* * `columnArray`
* @deprecated This method is deprecated. Instead of adding rows individually, you should pass
* all row objects when calling [[Connection.execBulkLoad]]. This method will be removed in the future.
*/
addRow(row: { [columnName: string]: unknown }): void

/**
* Adds a row to the bulk insert.
*
* An array representing the values of each column in the same order which they were added to the bulkLoad object.
* ```js
* bulkLoad.addRow('Bill', 'Gates');
* ```
*
* * `col0, col1, ... colN`
* @param row If there are at least two columns, values can be passed as multiple arguments instead of an array. They
* must be in the same order the columns were added in.
*
* If there are at least two columns, values can be passed as multiple arguments instead of an array. They
* must be in the same order the columns were added in.
* @deprecated This method is deprecated. Instead of adding rows individually, you should pass
* all row objects when calling [[Connection.execBulkLoad]]. This method will be removed in the future.
*/
addRow(...row: unknown[]): void

/**
* Adds a row to the bulk insert.
*
* ```js
* bulkLoad.addRow(['Bill', 'Gates']);
* ```
*
* @param input
* @param row An array representing the values of each column in the same order which they were added to the bulkLoad object.
*
* @deprecated This method is deprecated. Instead of adding rows individually, you should pass
* all row objects when calling [[Connection.execBulkLoad]]. This method will be removed in the future.
*/
addRow(...input: [{ [key: string]: any }] | Array<any>) {
addRow(row: unknown[]): void

addRow(...input: [ { [key: string]: unknown } ] | unknown[]) {
this.firstRowWritten = true;

let row: any;
Expand Down Expand Up @@ -649,6 +668,11 @@ class BulkLoad extends EventEmitter {
*
* After that, the stream emits a ['drain' event](https://nodejs.org/dist/latest-v10.x/docs/api/stream.html#stream_event_drain)
* when it is ready to resume data transfer.
*
* @deprecated
* This method is deprecated. Instead of writing rows to the stream returned by this method,
* you can pass any object that implements the `Iterable` or `AsyncIterable` interface (e.g. a `Readable`
* stream or an `AsyncGenerator`) when calling [[Connection.execBulkLoad]]. This method will be removed in the future.
*/
getRowStream() {
if (this.firstRowWritten) {
Expand Down
102 changes: 98 additions & 4 deletions src/connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2841,14 +2841,106 @@ class Connection extends EventEmitter {
}

/**
* Executes a [[BulkLoad]].
* Execute a [[BulkLoad]].
*
* @param bulkLoad
* ```js
* // We want to perform a bulk load into a table with the following format:
* // CREATE TABLE employees (first_name nvarchar(255), last_name nvarchar(255), day_of_birth date);
*
* const bulkLoad = connection.newBulkLoad('employees', (err, rowCount) => {
* // ...
* });
*
* // First, we need to specify the columns that we want to write to,
* // and their definitions. These definitions must match the actual table,
* // otherwise the bulk load will fail.
* bulkLoad.addColumn('first_name', TYPES.NVarchar, { nullable: false });
* bulkLoad.addColumn('last_name', TYPES.NVarchar, { nullable: false });
* bulkLoad.addColumn('date_of_birth', TYPES.Date, { nullable: false });
*
* // Now, we can specify each row to be written.
* //
* // Note that these rows are held in memory until the
* // bulk load was performed, so if you need to write a large
* // number of rows (e.g. by reading from a CSV file),
* // using a streaming bulk load is advisable to keep memory usage low.
* bulkLoad.addRow({ 'first_name': 'Steve', 'last_name': 'Jobs', 'day_of_birth': new Date('02-24-1955') });
* bulkLoad.addRow({ 'first_name': 'Bill', 'last_name': 'Gates', 'day_of_birth': new Date('10-28-1955') });
*
* connection.execBulkLoad(bulkLoad);
* ```
*
* @param bulkLoad A previously created [[BulkLoad]].
*
* @deprecated Adding rows to a [[BulkLoad]] via [[BulkLoad.addRow]] or [[BulkLoad.getRowStream]]
* is deprecated and will be removed in the future. You should migrate to calling [[Connection.execBulkLoad]]
* with a `Iterable` or `AsyncIterable` as the second argument instead.
*/
execBulkLoad(bulkLoad: BulkLoad): void

/**
* Execute a [[BulkLoad]].
*
* ```js
* // We want to perform a bulk load into a table with the following format:
* // CREATE TABLE employees (first_name nvarchar(255), last_name nvarchar(255), day_of_birth date);
*
* const bulkLoad = connection.newBulkLoad('employees', (err, rowCount) => {
* // ...
* });
*
* // First, we need to specify the columns that we want to write to,
* // and their definitions. These definitions must match the actual table,
* // otherwise the bulk load will fail.
* bulkLoad.addColumn('first_name', TYPES.NVarchar, { nullable: false });
* bulkLoad.addColumn('last_name', TYPES.NVarchar, { nullable: false });
* bulkLoad.addColumn('date_of_birth', TYPES.Date, { nullable: false });
*
* // Execute a bulk load with a predefined list of rows.
* //
* // Note that these rows are held in memory until the
* // bulk load was performed, so if you need to write a large
* // number of rows (e.g. by reading from a CSV file),
* // passing an `AsyncIterable` is advisable to keep memory usage low.
* connection.execBulkLoad(bulkLoad, [
* { 'first_name': 'Steve', 'last_name': 'Jobs', 'day_of_birth': new Date('02-24-1955') },
* { 'first_name': 'Bill', 'last_name': 'Gates', 'day_of_birth': new Date('10-28-1955') }
* ]);
* ```
*
* @param bulkLoad A previously created [[BulkLoad]].
* @param rows A [[Iterable]] or [[AsyncIterable]] that contains the rows that should be bulk loaded.
*/
execBulkLoad(bulkLoad: BulkLoad) {
execBulkLoad(bulkLoad: BulkLoad, rows: AsyncIterable<unknown[] | { [columnName: string]: unknown }> | Iterable<unknown[] | { [columnName: string]: unknown }>): void

execBulkLoad(bulkLoad: BulkLoad, rows?: AsyncIterable<unknown[] | { [columnName: string]: unknown }> | Iterable<unknown[] | { [columnName: string]: unknown }>) {
bulkLoad.executionStarted = true;

if (!bulkLoad.streamingMode) {
if (rows) {
if (bulkLoad.streamingMode) {
throw new Error("Connection.execBulkLoad can't be called with a BulkLoad that was put in streaming mode.");
}

if (bulkLoad.firstRowWritten) {
throw new Error("Connection.execBulkLoad can't be called with a BulkLoad that already has rows written to it.");
}

const rowStream = Readable.from(rows);

// Destroy the packet transform if an error happens in the row stream,
// e.g. if an error is thrown from within a generator or stream.
rowStream.on('error', (err) => {
bulkLoad.rowToPacketTransform.destroy(err);
});

// Destroy the row stream if an error happens in the packet transform,
// e.g. if the bulk load is cancelled.
bulkLoad.rowToPacketTransform.on('error', (err) => {
rowStream.destroy(err);
});

rowStream.pipe(bulkLoad.rowToPacketTransform);
} else if (!bulkLoad.streamingMode) {
// If the bulkload was not put into streaming mode by the user,
// we end the rowToPacketTransform here for them.
//
Expand Down Expand Up @@ -3281,6 +3373,8 @@ class Connection extends EventEmitter {
// Only set a request error if no error was set yet.
request.error ??= error;

payloadStream.unpipe(message);

message.ignore = true;
message.end();
});
Expand Down
Loading

0 comments on commit 54c141a

Please sign in to comment.