The await
commands that I have commented with //******This await does not work */
do not seem to work. Not sure if this is something related to the fact they are in an event stream or a problem with the promise in the imported module.
When I call the run
function from within a mapped array to insert data from multiple sources, the run
function returns immediately rather than waiting until knex
has completed inserting the data.
app.get("/api/pull/all_data", (req, res)=>{
const dataSources = [
{resource: 'DI_ORDER_TYPE', tableName: 'ln_order_type'},
{resource: 'DI_DATES', tableName: 'ln_dates'},
{resource: 'WHINR140_INVENTORY', tableName: 'ln_inventory'}
]
dataSources.map(async (ds)=>{
console.log(`Importing ${ds.resource} into table ${ds.tableName}`)
await get_all_data(ds.tableName, ds.resource)
})
console.log("Import complete")
})
Here is my run
function which is being called from the code above.
const request = require('request')
const JSONStream = require('JSONStream')
const es = require('event-stream')
const knex_ln = require('../knex_ln')
const insertData = require('../insert_data')
const create_table = require('../create_table_from_json.js')
const clean_fieldnames = require('../clean_fieldnames')
function run(tableName, resourceName) {
return new Promise(async (resolve, reject)=>{
let tableData = []
let recordCount = 0
let maxRecords = 10000
let totalRecords = 0
// let tableName = 'LN_di_order_type'
// let resourceName = 'DI_ORDER_TYPE'
let rowData = {}
//Delete Existing Data and wait for it to complete
await knex_ln.schema.hasTable(tableName).then(async (exists)=>{
if(exists){
try {
await knex_ln(tableName).delete().then(()=>{})
} catch (error) {
}
}
})
//Get LN replica data and pipe data into JSONStream
request(`${process.env.API_BASE_URL}/${process.env.SECURITY_NAME}/${resourceName}`,
{
auth: {
'user': process.env.API_USER,
'pass': process.env.API_PASS
}
}
)
.pipe(JSONStream.parse([true, {recurse: true}, `${process.env.SECURITY_NAME}.row`, true]))
.pipe(es.mapSync(async (row)=>{
rowData = row
let cleanData = await clean_fieldnames(row)
tableData.push(cleanData)
recordCount += 1
totalRecords += 1
if(recordCount >= maxRecords){
try {
//******This await does not work */
await create_table(tableName, row)
} catch (error) {
console.log("Unable to create table", error)
}
//Insert records
try {
//******This await does not work */
await insertData(tableName, tableData)
console.log(`inserting ${recordCount} records into table ${tableName}`)
} catch (error) {
console.log("Unable to insert data: ", error)
}
//Reset tracker variables
recordCount = 0
tableData = []
}
}))
.on('end', async ()=>{
await create_table(tableName, rowData)
await insertData(tableName, tableData)
console.log(`Inserted ${totalRecords} into table ${tableName}`)
resolve('OK')
})
.on('error',(err)=>{
reject(err)
})
})
}
module.exports = run
Here is my module file which returns a promise
//insert_data.js
const knex_ln = require('./knex_ln')
module.exports = async (tableName, tableData) =>
new Promise(async (resolve, reject) => {
try {
await knex_ln(tableName).insert(tableData)
console.log("Inserting Data: ", tableData.length)
resolve()
} catch (error) {
console.log("Error inserting data: ", err)
reject(err)
}
})
Here is an example of the output
Importing DI_ORDER_TYPE into table ln_order_type
Importing DI_DATES into table ln_dates
Importing WHINR140_INVENTORY into table ln_inventory
Importing WHWMD210_WAREHOUSE_ITEM_DATA into table ln_warehouse_item_data
Importing TDIPU010_ITEM_BUY_FROM_BP_INFORMATION into table ln_item_buy_from_bp_information
Importing TDIPU001_ITEM_PURCHASE_DATA into table ln_item_purchase_data
Importing TDPCG031_PRICE_BOOKS into table ln_price_books
Importing TDPUR300_PURCHASE_CONTRACTS into table ln_purchase_contracts
Importing TDPUR301_PURCHASE_CONTRACT_LINES into table ln_purchase_contract_lines
Inserted 72 records into table ln_order_type
Inserted 217 records into table ln_purchase_contracts
inserting 10000 records into table ln_inventory
Inserted 4694 records into table ln_purchase_contract_lines
inserting 10000 records into table ln_item_buy_from_bp_information
inserting 10000 records into table ln_dates
inserting 10000 records into table ln_inventory
inserting 10000 records into table ln_price_books
inserting 10000 records into table ln_item_purchase_data
inserting 10000 records into table ln_inventory
inserting 10000 records into table ln_price_books
inserting 10000 records into table ln_dates
inserting 10000 records into table ln_inventory
inserting 10000 records into table ln_price_books
inserting 10000 records into table ln_item_purchase_data
The solution for me was to use bluebird Promise.each
This will process each of the items in array dataSources and wait for the promise to return before processing the next item in the list.
Promise.each(dataSources, function(ds){
....
}).then(()=>{
....
})