首页 > 解决方案 > Python multiprocessing make multiple api calls

问题描述

i'm trying to speed up my code with multiprocessing and making multiple api calls at once. currently i'm making an api call, get the data needed from there and then insert them into the database. it works but it's very slow. i need to have about 700-800 million users in the database and at this current speed it will take about 200-250 days. how can i make multiple api calls?

import traceback
import requests
import json
import sys
from time import time, sleep
from multiprocessing import Process, Queue
from io import BytesIO
import imagehash
from PIL import Image
import sqlite3
from multiprocessing import Process
from multiprocessing import Pool as ThreadPool 

min = 7960265729
max = 9080098567

database_location = 'D:/Script/steam_database.db'
key = []
pool_size = 32
image_hashes = []

def queue_flusher(queue, flush_limit=80, temp = 0):
    connection = sqlite3.connect(database_location)
    cursor = connection.cursor()
    cursor.execute("CREATE TABLE IF NOT EXISTS user (id INTEGER PRIMARY KEY AUTOINCREMENT, hash TEXT, profile TEXT)")
    connection.commit()
    while True:
        if(queue.qsize() < flush_limit):
            sleep(.1)
        else:
            temp += 80
            print(f"Flushing {flush_limit} out of queue {temp}")
            queue_input = [queue.get() for _ in range(0, flush_limit)]
            cursor = connection.cursor()
            for row in queue_input:
                if row['image'] not in image_hashes:
                    print(f"Inserting Row: {repr(row)}")
                    cursor.execute("INSERT INTO user (hash, profile) VALUES (?, ?);", (row['image'], row['profileUrl']))
                    image_hashes.append(row['image'])
            connection.commit()
    connection.close()

def databaseFiller(queue, min = 0, max = 0):
    while True:
        try:
            for i in range(min, max):
                r = requests.get(f'http://api.steampowered.com/ISteamUser/GetPlayerSummaries/v0002/?key={key[3]}&steamids=7656119{i}').json()
                pool = ThreadPool(8)
                all = pool.map(databaseFiller, i)
                response = r

                player = None
                steamid = None

                response = response.get('response', None)
                if response is None or not response.get('players', None):
                    continue
                player = response['players'][0]
                pfp = player.get('avatar', None)
                profileUrl = player.get('profileurl', None)
                if pfp != "https://steamcdn-a.akamaihd.net/steamcommunity/public/images/avatars/fe/fef49e7fa7e1997310d705b2a6158ff8dc1cdfeb.jpg":
                    img = requests.get(pfp)
                    img = Image.open(BytesIO(img.content))
                    image = str(imagehash.average_hash(img))
                    queue.put({'image': image, 'profileUrl': profileUrl})

        except Exception as e:
            # print(f'Received Response: {response}')
            print("Printing only the traceback above the current stack frame")
            print("".join(traceback.format_exception(sys.exc_info()[0], sys.exc_info()[1], sys.exc_info()[2])))
            print("Printing the full traceback as if we had not caught it here...")
            print(format_exception(e))


def format_exception(e):

    exception_list = traceback.format_stack()
    exception_list = exception_list[:-2]
    exception_list.extend(traceback.format_tb(sys.exc_info()[2]))
    exception_list.extend(traceback.format_exception_only(
        sys.exc_info()[0], sys.exc_info()[1]))

    exception_str = "Traceback (most recent call last):\n"
    exception_str += "".join(exception_list)
    exception_str = exception_str[:-1]

    return exception_str

if __name__ == '__main__':
    database_connection = sqlite3.connect("steam_database.db")
    data_queue = Queue()
    data_flush_process = Process(target=queue_flusher, args=([data_queue]))
    data_flush_process.start()
    total_nums = max - min
    nums_per_process = total_nums // pool_size
    for i in range(pool_size):
        new_min = min + (nums_per_process * i)
        new_max = max if i == (pool_size-1) else new_min + nums_per_process
        Process(target=databaseFiller, args=([data_queue, new_min, new_max])).start()

thanks.

标签: pythondatabaseapimultiprocessingpython-multiprocessing

解决方案


这不会 100% 解决您的问题,但我看到您正在将文本插入到 sqlite 文件中,您应该将整个内容下载到例如 csv 中并使用执行 cursor.executemany 而不是 cursor.execute。插入速度更快。下载 1 次需要多长时间?


推荐阅读