Search code examples
csvapache-sparkencodingsalesforce

Getting weird characters in CSV, making it unreadable for Spark


I am using Salesforce Bulk 2.0 API to fetch Salesforce Objects' data. I have created respective function for so -

/**
 * Fetches results from a Job ID of Batch query.
 * NOTE : Provided Job ID must be in "JobComplete" state to fetch results
 *
 * @param jobId      Job ID
 * @param maxRecords Number of records to fetch in one chunk of API hit
 * @return {@link String} CSV-formatted String/results (text/csv)
 * @throws IOException Exception in mapping object
 * @author ujjawal pandey
 */
public String getJobResults(String jobId, String maxRecords) throws IOException {
    String getJobInfoUrl = String.format(JobResourcePath.getJobResultPath(), this.apiUrl,
            this.apiVersion, jobId, maxRecords);
    String sforceLocator = null;
    String filePath = jobId + ".csv";
    boolean isFirstDatasetFetch = true;
    BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(
            Files.newOutputStream(Paths.get(filePath)),
            StandardCharsets.UTF_8));
    do {
        String getJobInfoUrlLocator = getJobInfoUrl;
        HttpClient httpClient = HttpClientBuilder.create().build();
        if (sforceLocator != null) {
            getJobInfoUrlLocator = getJobInfoUrl + "&locator=" + sforceLocator;
        }

        LOGGER.info(getJobInfoUrlLocator);
        HttpGet httpGet = new HttpGet(getJobInfoUrlLocator);
        httpGet.setHeader("Authorization", "Bearer " + accessToken);

        HttpResponse response = httpClient.execute(httpGet);
        LOGGER.info(response.toString());
        int responseCode = response.getStatusLine().getStatusCode();
        String responseBody = "";

        if (responseCode == Constants.HTTP_STATUS_OK) {
            org.apache.http.HttpEntity entity = response.getEntity();
            sforceLocator = response.getFirstHeader("Sforce-Locator").getValue();
            LOGGER.info("Locator is: " + sforceLocator);

            responseBody = EntityUtils.toString(entity);

            LOGGER.debug(String.valueOf(responseBody));
            if (isFirstDatasetFetch) {
                writer.write(responseBody);
                isFirstDatasetFetch = false;
            } else {
                writer.write(responseBody.substring(
                        responseBody.indexOf('\n') + 1));
            }
        } else {
            LOGGER.error(responseBody);
            throw new RuntimeException(responseBody);
        }
    } while (sforceLocator != null && !sforceLocator.equals("null"));
    writer.close();
    return filePath;
}

The problem is that the CSV that is being created is in correct format, but it's getting some weird characters in some columns. For example, ⢠(euro in between them, pasting makes it disappear)

Now when I am reading in Spark, with following configuration -

spark.read()
     .option("header", "true")
     .option("delimiter", ",")
     .option("lineSep", "\n")
     .option("multiLine", "true")
     .option("encoding", "UTF-8")
     .csv(hdfsTempCsvStoragePath + "/" + csvPath);

I am getting extra rows (4) because of the following characters "probably". PFA.

Screenshot of dataframe

I know the issue is related with encoding but not with good understanding.

  1. What I am missing here (the underlying concept) because of that this issue is happening?
  2. What would be the best approach to solve this?

Since I think I am missing on something, so I haven't tried much. The one option that I thought for last resort is that I'll have to clean the CSV, so that it is readable to Spark.


Solution

  • Actually, those extra rows were coming up not because of the following characters but because of the line above, which had double quotes and those weren't escaped, hence spark was not able to parse the CSV correctly, breaking the below lines in multiple rows.

    spark.read()
         .option("header", "true")
         .option("delimiter", ",")
         .option("multiLine", "true")
         .option("encoding", "UTF-8")
         .option("escape", "\"")
    

    Also, those weird characters were actually "bullets" but since because of encoding mismatch in my function, they were appearing like that. I was writing the CSV in UTF-8 but EntityUtils.toString(entity); was converting it in some other encoding, so I forced the toString conversion to UTF-8 by EntityUtils.toString(entity, "UTF-8").