Skip to content
Advertisement

How to improve the performance of traversing a large dataset

I want to improve the logic for my task that is defined as following, the tasks is implemented in Python 3 with Django framwork:

The source data is onboard to our system, the Job entity defines what involves to fully process a source, Job_instance defines the instance of Job. The requirement is as following:

  1. Given the source name, find the latest onboard date that has all jobs of the source being processed.
  2. Also output the overall status of job being processed

The entity relationship and table structure:

  1. “Job” – this is an entity defines what involves to process a source ingested from external party. a source is divided into parts that can be processed into multiple jobs. table structure:
Field name description type
id id of job int
name name of job string
source name of source string
  1. “Job_instance” – this is instance of the job that has been executed in our system.
Field name description type
id id of job instance int
job_id id of job int
period date string the source has been onboard to our system string
updated_timestamp last execution time and date for the job timestamp
status the status of the instance after run has been proceeded, can be one of any, e.g: success, failed, finished, created etc string

The overall status of jobs processed shows as “Success” when all the job_instance of each job running successfully, otherwise it shows as “Failure”

My current logic:

  1. Get list of jobs from source name and cached job_ids with boolean as a map.

    job_ids = Job.objects.filter(Q(name__icontains=source)
    ).values_list('id', flat=True)
    
    """ Combine all Job Ids into one restriction """
    """ Constructing where clause to match all job ids """
    ids_filter = Q()
    for job_id in job_ids:
        ids_filter = ids_filter | Q(job__id=job_id)
    
  2. Get list of job instances from job ids and sort it by period and update_timestamp in descending order( this is done from database)

    completed_status_filter = Q()
    """ Only check job instances that have been completed """
    for status in [status_dict.get('Failed'), status_dict.get('Finished')]:
        completed_status_filter = completed_status_filter | Q(
            status__exact=status)
    
    """ Find Finished or Failed job instances for job ids """
    """ and sort by period_code and updated timestamp by decending order """
    job_instances = JobInstance.objects 
        .filter(ids_filter, completed_status_filter) 
        .values('id', 'job__id', 'period_code', 'update_timestamp', 'status') 
        .exclude(period_code='Ad Hoc') 
        .order_by('-period_code', '-update_timestamp')
    
  3. Created a period to list of job_instances map.

    def get_period_code_instances_map(self, job_instances):
        period_code_instance_map = {}
        for instance in job_instances.iterator():
           instances =period_code_instance_map.get(instance['period_code'])
           if not instances:
             existing_instances = list()
             period_code_instance_map[instance['period_code']] 
                = existing_instances
            existing_instances.append(instance)
           else:
            instances.append(instance)
    
       return period_code_instance_map
    
  4. Traversing the map as following:

    def get_last_period_code_at_curated(
        self, period_code_instances_map, job_ids, status_dict):
    
    target_period = None
    job_ids_verification_map = dict((key, False) for key in job_ids)
    """ Traverse instances ran on the same period_code """
    for period_code, instances in period_code_instances_map.items():
        for instance in instances:
            job_id = instance['job__id']
            """ Set to True if job has not been visited"""
            if not job_ids_verification_map.get(job_id):
                job_ids_verification_map[job_id] = True
    
        """ If job id has not been visited after traversing """
        """ reset job_id verifcation map """
        if any(not checked for checked in job_ids_verification_map.values()):
            job_ids_verification_map = dict(
                (key, False) for key in job_ids)
        else:
            target_period = period_code
            break
    
    result = JobInstanceLatestPeriodCodeForSource()
    if not target_period:
        return result
    
    """ Once period_code is found, get the timestamp from 1st instance """
    """ Evaluate status by check all statuses in instances """
    instances = period_code_instances_map[target_period]
    result.period_code = target_period
    result.update_timestamp = instances[0]['update_timestamp']
    failed_status = status_dict.get('Failed')
    if any(instance['status'] == failed_status for instance in instances):
        result.status = 'Failed'
    else:
        result.status = 'Finished'
    
    return result
    

    a. Iterate all job instance in the period to find if all jobs have been run. Here i used the job_id and boolean map to track which job has been visited for the period.

    b. If all jobs have been run in the period, then the period is found. otherwise, keep traversing the map.

  5. When period is found, iterate the job instance list from the map again to find the overall status.

Limitation:

  • Since the list of job_instances can be quite big( > 1 million entries), I have used iterator to create the map when going through data from database. But any optimization suggestion is welcome.
  • Is there any more efficient way on time/space to find the period code. I found i have to use nested for loop to traversing, which gives me O(n2).

Thanks in advance

Advertisement

Answer

Found the solution, since it can take some time to process the result set, I asked the upstream user to poll every 5 seconds until the result has been calculated.

User contributions licensed under: CC BY-SA
8 People found this is helpful
Advertisement