Search code examples
node.jsamazon-web-serviceselasticsearchhttpsbulk

aws elastic search http request , error 'The bulk request must be terminated by a newline'


I have used this link to create a bulk http request using a JSON.stringified(body) like this:

const body = [ { index: { _index: 'image_2', _type: '_doc', _id: 0 } },
{ imageKey: 'test' },
{ index: { _index: 'image_2', _type: '_doc', _id: 1 } },
{ imageKey: 'test2' } ]

but I keep getting the error

{ statusCode: 400,
body: '{"error":{"root_cause":[{"type":"illegal_argument_exception","reason":"The bulk request must be terminated by a newline [\\\\n]"}],"type":"illegal_argument_exception","reason":"The bulk request must be terminated by a newline [\\\\n]"},"status":400}' }

I already tried this below and it is not working either:

const body = `${JSON.stringified(body)\n}`;

any ideas are appreciated :)

here is my aws elastic search function:

function elasticsearchFetch(elasticsearchDomain, endpointPath, options = {}, region = process.env.AWS_REGION) {
  return new Promise((resolve, reject) => {
    const { body, method = 'GET' } = options;

    const endpoint = new AWS.Endpoint(elasticsearchDomain);
    const request = new AWS.HttpRequest(endpoint, region);
    request.method = method;
    request.path += endpointPath;
    request.headers.host = elasticsearchDomain;
    if (body) {
      request.body = body;
      request.headers['Content-Type'] = 'application/json';
      request.headers['Content-Length'] = request.body.length;
    }

    const credentials = new AWS.EnvironmentCredentials('AWS');
    const signer = new AWS.Signers.V4(request, 'es');
    signer.addAuthorization(credentials, new Date());

    const client = new AWS.HttpClient();
    client.handleRequest(request, null, (res) => {
      res.on('data', (chunk) => {
        chunks += chunk;
      });
      res.on('end', () => resolve({ statusCode: res.statusCode, body: chunks }));
    }, error => reject(error));
  });
}

and here is the code for my lambda function using the above request signer:

const elasticsearchFetch = require('rms-library/fetch');
exports.handler = async ({ imageID, bulk, products }) => {
  if (!Array.isArray(products) || !imageID) {
    throw new Error('error in bulk operation');
  }
  const bulkList = [];
  products.forEach((product, i) => {
    bulkList.push({ index: { _index: imageID, _type: '_doc', _id: i } });
    bulkList.push(product);
  });
  bulkList.push('');
  console.log('bulkList', bulkList);
  const result = await elasticsearchFetch.elasticsearch(
    process.env.ELASTIC_SEARCH_DOMAIN,
    '_bulk',
    { method: 'PUT', body: JSON.stringify(bulkList) },
  );
  console.log(result);
};

Solution

  • Ok, first of all you need to use POST instead of PUT with the _bulk endpoint and the body cannot be a stringified JSON array.

    Try like this instead:

    const result = await elasticsearchFetch.elasticsearch(
      process.env.ELASTIC_SEARCH_DOMAIN,
      '_bulk',
      { method: 'POST', body: bulkList.map(json => {return JSON.stringify(json);}).join('\n') + '\n' },
    );