I want to improve the logic for my task that is defined as following, the tasks is implemented in Python 3 with Django framwork:
The source data is onboard to our system, the Job entity defines what involves to fully process a source, Job_instance defines the instance of Job. The requirement is as following:
- Given the source name, find the latest onboard date that has all jobs of the source being processed.
- Also output the overall status of job being processed
The entity relationship and table structure:
- “Job” – this is an entity defines what involves to process a source ingested from external party. a source is divided into parts that can be processed into multiple jobs. table structure:
Field name | description | type |
---|---|---|
id | id of job | int |
name | name of job | string |
source | name of source | string |
- “Job_instance” – this is instance of the job that has been executed in our system.
Field name | description | type |
---|---|---|
id | id of job instance | int |
job_id | id of job | int |
period | date string the source has been onboard to our system | string |
updated_timestamp | last execution time and date for the job | timestamp |
status | the status of the instance after run has been proceeded, can be one of any, e.g: success, failed, finished, created etc | string |
The overall status of jobs processed shows as “Success” when all the job_instance of each job running successfully, otherwise it shows as “Failure”
My current logic:
Get list of jobs from source name and cached job_ids with boolean as a map.
JavaScriptx91job_ids = Job.objects.filter(Q(name__icontains=source)
2).values_list('id', flat=True)
3
4""" Combine all Job Ids into one restriction """
5""" Constructing where clause to match all job ids """
6ids_filter = Q()
7for job_id in job_ids:
8ids_filter = ids_filter | Q(job__id=job_id)
9
Get list of job instances from job ids and sort it by period and update_timestamp in descending order( this is done from database)
JavaScript114141completed_status_filter = Q()
2""" Only check job instances that have been completed """
3for status in [status_dict.get('Failed'), status_dict.get('Finished')]:
4completed_status_filter = completed_status_filter | Q(
5status__exact=status)
6
7""" Find Finished or Failed job instances for job ids """
8""" and sort by period_code and updated timestamp by decending order """
9job_instances = JobInstance.objects
10.filter(ids_filter, completed_status_filter)
11.values('id', 'job__id', 'period_code', 'update_timestamp', 'status')
12.exclude(period_code='Ad Hoc')
13.order_by('-period_code', '-update_timestamp')
14
Created a period to list of job_instances map.
JavaScript114141def get_period_code_instances_map(self, job_instances):
2period_code_instance_map = {}
3for instance in job_instances.iterator():
4instances =period_code_instance_map.get(instance['period_code'])
5if not instances:
6existing_instances = list()
7period_code_instance_map[instance['period_code']]
8= existing_instances
9existing_instances.append(instance)
10else:
11instances.append(instance)
12
13return period_code_instance_map
14
Traversing the map as following:
JavaScript139391def get_last_period_code_at_curated(
2self, period_code_instances_map, job_ids, status_dict):
3
4target_period = None
5job_ids_verification_map = dict((key, False) for key in job_ids)
6""" Traverse instances ran on the same period_code """
7for period_code, instances in period_code_instances_map.items():
8for instance in instances:
9job_id = instance['job__id']
10""" Set to True if job has not been visited"""
11if not job_ids_verification_map.get(job_id):
12job_ids_verification_map[job_id] = True
13
14""" If job id has not been visited after traversing """
15""" reset job_id verifcation map """
16if any(not checked for checked in job_ids_verification_map.values()):
17job_ids_verification_map = dict(
18(key, False) for key in job_ids)
19else:
20target_period = period_code
21break
22
23result = JobInstanceLatestPeriodCodeForSource()
24if not target_period:
25return result
26
27""" Once period_code is found, get the timestamp from 1st instance """
28""" Evaluate status by check all statuses in instances """
29instances = period_code_instances_map[target_period]
30result.period_code = target_period
31result.update_timestamp = instances[0]['update_timestamp']
32failed_status = status_dict.get('Failed')
33if any(instance['status'] == failed_status for instance in instances):
34result.status = 'Failed'
35else:
36result.status = 'Finished'
37
38return result
39
a. Iterate all job instance in the period to find if all jobs have been run. Here i used the job_id and boolean map to track which job has been visited for the period.
b. If all jobs have been run in the period, then the period is found. otherwise, keep traversing the map.
When period is found, iterate the job instance list from the map again to find the overall status.
Limitation:
- Since the list of job_instances can be quite big( > 1 million entries), I have used iterator to create the map when going through data from database. But any optimization suggestion is welcome.
- Is there any more efficient way on time/space to find the period code. I found i have to use nested for loop to traversing, which gives me O(n2).
Thanks in advance
Advertisement
Answer
Found the solution, since it can take some time to process the result set, I asked the upstream user to poll every 5 seconds until the result has been calculated.