Observation of sequential and asynchronous execution in python

Goal: process 100K sqlite files where each file is approx 300MB (decode blob field and delete matching rows) Machine specs: cores:64, mem:100G The machine already has other critical services running hence the max_workers=15 is set. If it is not throttled the memory usage goes through the roof. It is approximately max_workers X size of a file opened. By default the max_workers = number of cores X 5

Concurrent/Async: asyn_process.py

import sqlite3
import json
import concurrent.futures
import logging
import time

logger = logging.getLogger('my_logger')
logger.setLevel(logging.DEBUG)
file_handler = logging.FileHandler('/tmp/mylog.log')
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s', datefmt='%Y-%m-%d %H:%M:%S')
file_handler.setFormatter(formatter)
logger.addHandler(file_handler)

def rm_eventx_from_db(sqlitefilename,logger):
    try:
        conn = sqlite3.connect(sqlitefilename)
        cursor = conn.cursor()

        cursor.execute('SELECT ID,LOG FROM OLD_LOGS')
        idlist=[]

        for row in cursor.fetchall():
            colid = row[0]
            msg = row[1]
            m = msg.decode('utf-8')
            msgjson = json.loads(m)
            # print(msgjson['_normalized_fields']['event_id'])
            if msgjson['_normalized_fields']['event_id'] == 12345:
                idlist.append(colid)
        for delete_id in idlist:
            cursor.execute('DELETE FROM OLD_LOGS WHERE ID = ?', (delete_id,))

        conn.commit()

        cursor.close()
        conn.close()
        logger.warning(f"processing done for {sqlitefilename}")
    except Exception as e:
        logger.warning(f"rm_eventx_from_db err: {sqlitefilename} "+str(e))

def vaccumdb(sqlitefilename):
    try:
        conn = sqlite3.connect(sqlitefilename)
        cursor = conn.cursor()
        cursor.execute('VACUUM')
        cursor.close()
        conn.commit()
        conn.close()
    except Exception as e:
        logger.warning(f"vaccum_db err: {sqlitefilename} "+str(e))    

def main():
    start_time = time.perf_counter()
    futures=[]
    listfile = '/tmp/filelist.txt'
    base_path='/data/storage/archive/'

    with open(listfile, 'r') as file:
        with concurrent.futures.ThreadPoolExecutor(max_workers=15) as executor:
            for line in file:
                line = line.strip()
                file_path=base_path+str(line)
                print(file_path)
                futures.append(executor.submit(rm_eventx_from_db,file_path,logger))
        for future in concurrent.futures.as_completed(futures):
            logger.warning("futures msg : "+str(future.result()))      
    fut_vac=[]
    with open(listfile, 'r') as file:
        with concurrent.futures.ThreadPoolExecutor(max_workers=15) as executor:
            for line in file:
                line = line.strip()
                file_path=base_path+line
                fut_vac.append(executor.submit(vaccumdb,file_path))
    for future in concurrent.futures.as_completed(fut_vac):
        logger.warning("vaccum futures msg : "+str(future.result()))             

    end_time = time.perf_counter()
    execution_time = end_time - start_time
    print(f"Elapsed time: {execution_time:.6f} Seconds")

if __name__ == "__main__":
    main()

here is some top stats

# top -H -p 1545043

top - 15:10:49 up 233 days, 23:17,  1 user,  load average: 9.39, 11.37, 12.03
Threads:  16 total,   2 running,  14 sleeping,   0 stopped,   0 zombie
%Cpu(s): 11.5 us, 11.4 sy,  0.4 ni, 74.9 id,  1.1 wa,  0.0 hi,  0.6 si,  0.0 st
MiB Mem : 100699.4 total,   3401.5 free,  83303.5 used,  13994.4 buff/cache
MiB Swap:   4096.0 total,     26.1 free,   4069.9 used.  16514.7 avail Mem 

    PID USER      PR  NI    VIRT    RES    SHR S  %CPU  %MEM     TIME+ COMMAND                                                                                                                                     
1545055 root      20   0 5464740   4.3g  15252 S  26.3   4.4   1:59.90 python async_process1.py                                                                                                                             
1545059 root      20   0 5464740   4.3g  15252 R  25.0   4.4   1:54.33 python async_process1.py                                                                                                                             
1545061 root      20   0 5464740   4.3g  15252 S  24.7   4.4   1:54.30 python async_process1.py                                                                                                                             
1545062 root      20   0 5464740   4.3g  15252 S  24.3   4.4   1:53.59 python async_process1.py                                                                                                                             
1545067 root      20   0 5464740   4.3g  15252 S  24.3   4.4   1:53.75 python async_process1.py                                                                                                                             
1545057 root      20   0 5464740   4.3g  15252 S  24.0   4.4   1:53.75 python async_process1.py                                                                                                                             
1545058 root      20   0 5464740   4.3g  15252 R  23.7   4.4   1:53.95 python async_process1.py                                                                                                                             
1545066 root      20   0 5464740   4.3g  15252 S  23.7   4.4   1:54.01 python async_process1.py                                                                                                                             
1545063 root      20   0 5464740   4.3g  15252 S  23.3   4.4   1:54.32 python async_process1.py                                                                                                                             
1545064 root      20   0 5464740   4.3g  15252 S  23.3   4.4   1:54.03 python async_process1.py                                                                                                                             
1545065 root      20   0 5464740   4.3g  15252 S  23.3   4.4   1:53.85 python async_process1.py                                                                                                                             
1545068 root      20   0 5464740   4.3g  15252 S  23.3   4.4   1:53.48 python async_process1.py                                                                                                                             
1545069 root      20   0 5464740   4.3g  15252 S  23.3   4.4   1:54.11 python async_process1.py                                                                                                                             
1545056 root      20   0 5464740   4.3g  15252 S  23.0   4.4   1:53.73 python async_process1.py                                                                                                                             
1545054 root      20   0 5464740   4.3g  15252 S  22.7   4.4   1:59.47 python async_process1.py                                                                                                                             
1545043 root      20   0 5464740   4.3g  15252 S   0.0   4.4   0:01.89 python async_process1.py

the total memory consumed by script is 4.3GB

After observing the log, is is observed that number of processed files per minute can vary from 2 to 15.

Below is a synchronous execution code sync_process2.py

import sqlite3
import json
import concurrent.futures
import logging
import time

logger = logging.getLogger('my_logger')
logger.setLevel(logging.DEBUG)
file_handler = logging.FileHandler('/tmp/mylog2.log')
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s', datefmt='%Y-%m-%d %H:%M:%S')
file_handler.setFormatter(formatter)
logger.addHandler(file_handler)

def rm_eventx_from_db(sqlitefilename,logger):
    try:
        conn = sqlite3.connect(sqlitefilename)
        cursor = conn.cursor()

        cursor.execute('SELECT ID,LOG FROM OLD_LOGS')
        idlist=[]

        for row in cursor.fetchall():
            colid = row[0]
            msg = row[1]
            m = msg.decode('utf-8')
            msgjson = json.loads(m)
            # print(msgjson['_normalized_fields']['event_id'])
            if msgjson['_normalized_fields']['event_id'] == 36870:
                idlist.append(colid)
        for delete_id in idlist:
            cursor.execute('DELETE FROM OLD_LOGS WHERE ID = ?', (delete_id,))

        conn.commit()

        cursor.close()
        conn.close()
        logger.warning(f"processing done for {sqlitefilename}")
    except Exception as e:
        logger.warning(f"rm_eventx_from_db err: {sqlitefilename} "+str(e))

def vaccumdb(sqlitefilename):
    try:
        conn = sqlite3.connect(sqlitefilename)
        cursor = conn.cursor()
        cursor.execute('VACUUM')
        cursor.close()
        conn.commit()
        conn.close()
    except Exception as e:
        logger.warning(f"vaccum_db err: {sqlitefilename} "+str(e))    

def main():
    start_time = time.perf_counter()
    futures=[]
    listfile = '/tmp/filelist2.txt'
    base_path='/data/archives/lake/'

    with open(listfile, 'r') as file:
            for line in file:
                line = line.strip()
                file_path=base_path+str(line)
                print(file_path)
                rm_eventx_from_db(file_path,logger)
                vaccumdb(file_path)         

    end_time = time.perf_counter()
    execution_time = end_time - start_time
    print(f"Elapsed time: {execution_time:.6f} Seconds")

if __name__ == "__main__":
    main()

It is observed that 99% of time 3 files are being processed per minute.

Below is cpu + mem usage stat


top - 02:20:56 up 234 days, 10:27,  1 user,  load average: 95.08, 95.59, 95.43
Tasks: 1178 total,   2 running, 1176 sleeping,   0 stopped,   0 zombie
%Cpu(s): 10.8 us,  9.8 sy,  0.1 ni, 77.7 id,  1.3 wa,  0.0 hi,  0.4 si,  0.0 st
MiB Mem : 100699.4 total,    637.1 free,  80412.8 used,  19649.5 buff/cache
MiB Swap:   4096.0 total,     17.7 free,   4078.3 used.  19406.4 avail Mem 

    PID USER      PR  NI    VIRT    RES    SHR S  %CPU  %MEM     TIME+ COMMAND                                                                                                                                     
1352886 root      20   0 5223396   4.1g  18236 S 339.0   4.1 284:48.95 python /script/asyn_process.py                                                                                                               
2542922 root      20   0  311076 295640   5452 R  99.7   0.3  27:14.71 python /script/sync_process.py

Async Python code execution offers advantages over synchronous execution when it comes to processing files at a faster rate. However, the choice between the two approaches involves tradeoffs that depend on available resources, time constraints, and existing processes.

Considering that the database operation is CPU-intensive, Python may not be the most suitable tool for such tasks.