I am cleaning up a massive CSV data dump. I was able to split the single large file into smaller ones using gawk
initially using a unix SE Query as a following flow:
BIG CSV file -> use gawk script + bash -> Small CSV files based on columns
I have about 12 split csv files that are created using the above mentioned flow and each with ~170K lines in them.
I am using python3.7.7
on a Windows 10 machine.
Code
def convert_raw_data(incoming_line, f_name, line_counter): # do some decoding magic # catch exception and try to log it into the a logger file under `f_name.log` def convert_files(dir_name, f_name, dest_dir_name): # Open the CSV file # Open the Destination CSV file to store decoded data line_counter = 1 for line in csv_reader: # convert raw HEX to Floating point values using `convert_raw_data` function call line_counter = line_counter + 1 status = convert_raw_data(csv) if status: return f'All good for {f_name}.' else: return f'Failed for {f_name}' def main(): # Parse Arguments Logic here # get CSV Files and their respective paths csv_files = get_data_files_list(args.datasets) # decode raw data from each split csv file as an individual process with concurrent.futures.ProcessPoolExecutor() as executor: results = [ executor.submit(convert_files, dir_name, f_name, dest_dir) for dir_name, f_name in csv_files ] for f in concurrent.futures.as_completed(results): print(f.result())
Requirements
I wish to set a logging
logger with the name f_name.log
within each process spawned by the ProcessPoolExecutor
and want to store the logs with the respective parsed file name. I am not sure if I should use something like:
def convert_raw_data(...., logger): logger.exception(raw_data_here) def convert_files(....): logger = logging.basicConfig(filename=f_name, level=logging.EXCEPTION)
or are there caveats for using logging modules in a multiprocessing environment?
Advertisement
Answer
Found out a simple way to achieve this task:
import logging def create_log_handler(fname): logger = logging.getLogger(name=fname) logger.setLevel(logging.ERROR) fileHandler = logging.FileHandler(fname + ".log") fileHandler.setLevel(logging.ERROR) logger.addHandler(fileHandler) formatter = logging.Formatter('%(name)s %(levelname)s: %(message)s') fileHandler.setFormatter(formatter) return logger
I called the create_log_handler
within my convert_files(.....)
function and then used logger.info
and logger.error` accordingly.
by passing the logger
as a parameter to convert_raw_data
I was able to log even the erroneous data point in each of my csv file on each process.