Search code examples
pythoncsvquotingwritercsvwriter

Cannot handle properly csv.writer escapechar and quoting parameters


I am developing a python script which, given:

  • the path of a csv file
  • a list of names of some of the columns of the csv file
  • a string to-be-replaced (A)
  • a string to-replace-with (B)

should:
substitute string A with string B in the cells of the indicated columns.

However, I am having trouble with writing the changed row in the output csv, because csv.writer encloses the modified lines in double quotes, while I don't want thema, but I don't understand how to handle the quoting.

Note: I cannot use pandas.

example:

soruce file:

code1;code2;money1;code3;type_payment;money2
74;1;185.04;10;AMEXCO;36.08
74;1;8.06;11;MASTERCARD;538.30
74;1;892.46;12;VISA;185.04
74;1;75.10;15;MAESTRO;8.06
74;1;63.92;16;BANCOMAT;892.46

desired output:

code1;code2;money1;code3;type_payment;money2
74;1;185,04;10;AMEXCO;36,08
74;1;8,06;11;MASTERCARD;538,30
74;1;892,46;12;VISA;185,04
74;1;75,10;15;MAESTRO;8,06
74;1;63,92;16;BANCOMAT;892,46

actual output:

code1;code2;money1;code3;type_payment;money2
"74;1;185,04;10;AMEXCO;36,08"
"74;1;8,06;11;MASTERCARD;538,30"
"74;1;892,46;12;VISA;185,04"
"74;1;75,10;15;MAESTRO;8,06"
"74;1;63,92;16;BANCOMAT;892,46"

my script:

result = {}

csv_file_path = 'myreport.csv'

columns_to_process = ['money1', 'money2']

string_to_be_replaced = "."

string_to_replace_with = ","

mydelimiter =  ";"

#--------------------------

# specific import for csv
import csv, io
# import operator
import shutil

# specific import to manage errors
import os, traceback

# define custom errors
class DelimiterManagementError(Exception):
    """This one is raised if the row splitted into single cells have a length greater than the length of the header, for which we assume the delimiter is not inside the cells of the header"""


# check file existence
if not os.path.isfile(csv_file_path):
    raise IOError("csv_file_path is not valid or does not exists: {}".format(csv_file_path))

# check the delimiter existence
with open(csv_file_path, 'r') as csvfile:
    first_line = csvfile.readline()
    # print("first_line", first_line)
    if mydelimiter not in first_line:
        delimiter_warning_message = "No delimiter found in file first line."
        result['warning_messages'].append(delimiter_warning_message)

# count the lines in the source file
NOL = sum(1 for _ in io.open(csv_file_path, "r"))

# print("NOL:", NOL)

# if NOL = 0 -> void file
# if NOL = 1 -> header-only file

if NOL > 0:

    # just get the columns names, then close the file
    #-----------------------------------------------------
    with open(csv_file_path, 'r') as csvfile:
        columnslist = csv.DictReader(csvfile, delimiter=mydelimiter)      
        list_of_dictcolumns = []
        # loop to iterate through the rows of csv
        for row in columnslist:
            # adding the first row
            list_of_dictcolumns.append(row)
            # breaking the loop after the
            # first iteration itself
            break  

    # transform the colum names into a list
    first_dictcolumn = list_of_dictcolumns[0]        
    list_of_column_names = list(first_dictcolumn.keys())
    number_of_columns = len(list_of_column_names)

    # check columns existence
    #------------------------
    column_existence = [ (column_name in list_of_column_names ) for column_name in columns_to_process ]

    if not all(column_existence):
        raise ValueError("File {} does not contains all the columns given in input for processing:\nFile columns names: {}\nInput columns names: {}".format(csv_file_path, list_of_column_names, columns_to_process))

    
    # determine the indexes of the columns to process
    indexes_of_columns_to_process = [i for i, column_name in enumerate(list_of_column_names) if column_name in columns_to_process]

    print("indexes_of_columns_to_process: ", indexes_of_columns_to_process)

    # build the path of a to-be-generated duplicate file to be used as output
    inputcsv_absname, inputcsv_extension = os.path.splitext(csv_file_path)
    csv_output_file_path = inputcsv_absname + '__output' + inputcsv_extension

    # define the processing function
    def replace_string_in_columns(input_csv, output_csv, indexes_of_columns_to_process, string_to_be_replaced, string_to_replace_with):

        number_of_replacements = 0

        with open(input_csv, 'r', newline='') as infile, open(output_csv, 'w', newline='') as outfile:
            reader = csv.reader(infile)
            writer = csv.writer(outfile)
            # writer = csv.writer(outfile, delimiter=mydelimiter, escapechar='\\', quoting=csv.QUOTE_NONE)

            row_index=0

            for row in reader:              

                for col_index in indexes_of_columns_to_process:

                    # break the processing when empty lines at the end of the file are reached
                    if len(row) == 0:
                        break

                    # get the single cell and analyze it
                    #-------------------------------------
                    
                    list_of_cells = row[0].split(mydelimiter)
                    
                    # WARNING: in case the inspecting cell contains the delimiter, the split will return more columns.
                    if len(list_of_cells) != number_of_columns:
                        raise DelimiterManagementError("In row {}: {}, the number of splitted cells is {}, but the number of columns (in header) is {}.".format(row_index, row, len(list_of_cells), number_of_columns))

                    cell = list_of_cells[col_index]
                    columns_before = list_of_cells[:col_index]
                    columns_after = list_of_cells[(col_index + 1):]


                    print("col_index: ", col_index)
                    print("row: ", row)
                    # print("list_of_cells: ", list_of_cells)
                    print("cell: ", cell)
                    # print("columns_before: ", columns_before)
                    # print("columns_after: ", columns_after)

                    if string_to_be_replaced in cell and row_index != 0:                        
                        # do the substitution in the cell
                        cell = cell.replace(string_to_be_replaced, string_to_replace_with)
                        number_of_replacements = number_of_replacements + 1
                        print("number_of_replacements: ", number_of_replacements)

                        # sew the row up agian
                        list_of_cells_replaced = columns_before + [ cell ] + columns_after

                        string_of_cells_replaced = mydelimiter.join(list_of_cells_replaced)

                        row_of_cells_replaced = [ string_of_cells_replaced ]
                        row = row_of_cells_replaced

                        print("substitutiion done: ", cell)
                        print("list_of_cells_replaced: ", list_of_cells_replaced)
                        print("string_of_cells_replaced: ", string_of_cells_replaced)

                        
                # write / copy the row in the new file
                writer.writerow(row)
                print("written row: ", row, "index: ", row_index)

                row_index=row_index+1

        return number_of_replacements 


    # launch the function
    result['number_of_modified_cells'] =  replace_string_in_columns(csv_file_path, csv_output_file_path, indexes_of_columns_to_process, string_to_be_replaced, string_to_replace_with)

    # replace the old csv with the new one
    shutil.copyfile(csv_output_file_path, csv_file_path)
    os.remove(csv_output_file_path)

    if result['number_of_modified_cells'] > 0:
        result['changed'] = True
    else:
        result['changed'] = False
    
else:

    result['changed'] = False


result['source_csv_number_of_raw_lines'] = NOL
result['source_csv_number_of_lines'] = NOL - 1

print("result:\n\n", result)

Solution

  • Everything got so much more complicated just because I called csv.reader without giving the parameters delimiter, quotechar, escapechar.

    So I turned the lines

    reader = csv.reader(infile)
    writer = csv.writer(outfile)
    

    into

    reader = csv.reader(infile, quoting=csv.QUOTE_NONE, delimiter=mydelimiter, quotechar='',escapechar='\\')
    writer = csv.writer(outfile, quoting=csv.QUOTE_NONE, delimiter=mydelimiter, quotechar='',escapechar='\\')
    

    and then rewrote the further rows of the script anew.

    Now it works and it is much less complicated, so that I don't have to define a custom error to be raised in case the text of any cell of the csv contains the delimiter.

    Lesson learned: when using libraries, always read the documentation before starting writing the code.

    result = {}
    
    csv_file_path = 'myreport.csv'
    
    columns_to_process = ['money1', 'money2']
    
    string_to_be_replaced = "."
    
    string_to_replace_with = ","
    
    mydelimiter =  ";"
    
    #--------------------------
    
    # specific import for csv
    import csv, io
    # import operator
    import shutil
    
    # specific import to manage errors
    import os, traceback
    
    
    
    
    # check file existence
    if not os.path.isfile(csv_file_path):
        raise IOError("csv_file_path is not valid or does not exists: {}".format(csv_file_path))
    
    # check the delimiter existence
    with open(csv_file_path, 'r') as csvfile:
        first_line = csvfile.readline()
        # print("first_line", first_line)
        if mydelimiter not in first_line:
            delimiter_warning_message = "No delimiter found in file first line."
            result['warning_messages'].append(delimiter_warning_message)
    
    # count the lines in the source file
    NOL = sum(1 for _ in io.open(csv_file_path, "r"))
    
    # print("NOL:", NOL)
    
    # if NOL = 0 -> void file
    # if NOL = 1 -> header-only file
    
    if NOL > 0:
    
        # just get the columns names, then close the file
        #-----------------------------------------------------
        with open(csv_file_path, 'r') as csvfile:
            columnslist = csv.DictReader(csvfile, delimiter=mydelimiter)      
            list_of_dictcolumns = []
            # loop to iterate through the rows of csv
            for row in columnslist:
                # adding the first row
                list_of_dictcolumns.append(row)
                # breaking the loop after the
                # first iteration itself
                break  
    
        # transform the colum names into a list
        first_dictcolumn = list_of_dictcolumns[0]        
        list_of_column_names = list(first_dictcolumn.keys())
        number_of_columns = len(list_of_column_names)
    
        # check columns existence
        #------------------------
        column_existence = [ (column_name in list_of_column_names ) for column_name in columns_to_process ]
    
        if not all(column_existence):
            raise ValueError("File {} does not contains all the columns given in input for processing:\nFile columns names: {}\nInput columns names: {}".format(csv_file_path, list_of_column_names, columns_to_process))
    
        
        # determine the indexes of the columns to process
        indexes_of_columns_to_process = [i for i, column_name in enumerate(list_of_column_names) if column_name in columns_to_process]
    
        print("indexes_of_columns_to_process: ", indexes_of_columns_to_process)
    
        # build the path of a to-be-generated duplicate file to be used as output
        inputcsv_absname, inputcsv_extension = os.path.splitext(csv_file_path)
        csv_output_file_path = inputcsv_absname + '__output' + inputcsv_extension
    
        # define the processing function
        def replace_string_in_columns(input_csv, output_csv, indexes_of_columns_to_process, string_to_be_replaced, string_to_replace_with):
    
            number_of_replacements = 0
    
            with open(input_csv, 'r', newline='') as infile, open(output_csv, 'w', newline='') as outfile:
                reader = csv.reader(infile, quoting=csv.QUOTE_NONE, delimiter=mydelimiter, quotechar='',escapechar='\\')
                writer = csv.writer(outfile, quoting=csv.QUOTE_NONE, delimiter=mydelimiter, quotechar='',escapechar='\\')
                # writer = csv.writer(outfile, delimiter=mydelimiter, escapechar='\\', quoting=csv.QUOTE_NONE)
    
                row_index=0
    
                for row in reader:              
    
                    for col_index in indexes_of_columns_to_process:
    
                        # break the processing when empty lines at the end of the file are reached
                        if len(row) == 0:
                            break
    
                        cell = row[col_index]
                        columns_before = row[:col_index]
                        columns_after = row[(col_index + 1):]
    
                        print("col_index: ", col_index)
                        print("row: ", row)
                        # print("list_of_cells: ", list_of_cells)
                        print("cell: ", cell)
                        # print("columns_before: ", columns_before)
                        # print("columns_after: ", columns_after)
    
                        if string_to_be_replaced in cell and row_index != 0:                        
                            # do the substitution in the cell
                            cell = cell.replace(string_to_be_replaced, string_to_replace_with)
                            number_of_replacements = number_of_replacements + 1
                            print("number_of_replacements: ", number_of_replacements)
    
                        #   # sew the row up agian
                            row_replaced = columns_before + [ cell ] + columns_after
    
                            row = row_replaced
                            
    
                    # write / copy the row in the new file
                    writer.writerow(row)
                    print("written row: ", row, "index: ", row_index)
    
                    row_index=row_index+1
    
            return number_of_replacements 
    
    
        # launch the function
        result['number_of_modified_cells'] =  replace_string_in_columns(csv_file_path, csv_output_file_path, indexes_of_columns_to_process, string_to_be_replaced, string_to_replace_with)
    
        # replace the old csv with the new one
        shutil.copyfile(csv_output_file_path, csv_file_path)
        os.remove(csv_output_file_path)
    
        if result['number_of_modified_cells'] > 0:
            result['changed'] = True
        else:
            result['changed'] = False
        
    else:
    
        result['changed'] = False
    
    
    result['source_csv_number_of_raw_lines'] = NOL
    result['source_csv_number_of_lines'] = NOL - 1
    
    print("result:\n\n", result)