from lxml import html, etree import requests import threading import time import igraph as ig import sqlite3 from sqlite3 import Error class mathPage: def __init__(self, mathID): """ Create an empty object only containing the ID """ self.id = mathID self.mirror = "http://mathgenealogy.org/" self.url = self.mirror+"id.php?id="+str(mathID) self.name = None self.title = None self.inst = None self.year = None self.diss = None self.advisorID = [None, None, None] def getEntry(self): return((self.id, self.name, self.title, self.inst, self.year, self.diss, self.advisorID[0], self.advisorID[1], self.advisorID[2])) def getInfo(self): temp = requests.get(self.url) temp.encoding = "utf-8" self.page = temp.text if "Please back up and try again" in temp.text: self.name = "Error" self.title = None self.inst = None self.diss = None self.advisorID = [None, None, None] else: self.tree = html.fromstring(self.page) self.parsePage() def checkPage(self): """ Check if the page exists before parsing to avoid errors. """ pass def parsePage(self): name1 = self.tree.xpath('//*[@id="paddingWrapper"]/h2/text()') self.name = str(name1[0]).replace(" "," ").strip() title1 = self.tree.xpath('//*[@id="paddingWrapper"]/div[2]/span/text()[1]') title2 = str(title1[0]) if title2 and not title2.isspace(): self.title = title2.strip() else: self.title = " " inst1 = self.tree.xpath('//*[@id="paddingWrapper"]/div[2]/span/span/text()') if len(inst1)>0: inst2 = str(inst1[0]) else: inst2 = " " if inst2 and not inst2.isspace(): self.inst = inst2.strip() else: self.inst = " " year1 = self.tree.xpath('//*[@id="paddingWrapper"]/div[2]/span/text()[2]') year2 = str(year1[0]) if year2 and not year2.isspace(): self.year = year2.strip() else: self.year = " " diss1 = self.tree.xpath('//*[@id="thesisTitle"]/text()') diss2 = str(diss1[0]) if diss2 and not diss2.isspace(): self.diss = diss2.strip() else: self.diss = " " ### May be problematic but unused at the moment advisor1 = self.tree.xpath('//*[@id="paddingWrapper"]/p[2]/a/text()') advisor = list() for s in advisor1: advisor.append(str(s).replace(" "," ")) ### Problematic part ---------------- advisor1 = self.tree.xpath('//*[@id="paddingWrapper"]/p[3]/a/text()') for s in advisor1: if "\n" not in s: # print(s) ### Debugging advisor.append(str(s).replace(" "," ")) #---------------------------------------- self.advisor = advisor advisorID1 = self.tree.xpath('//*[@id="paddingWrapper"]/p[2]/a/@href') advisorID = list() for s in advisorID1: advisorID.append(int(str(s)[10:])) advisorID1 = self.tree.xpath('//*[@id="paddingWrapper"]/p[3]/a/@href') for s in advisorID1: if "Chrono" not in s: advisorID.append(int(str(s)[10:])) if len(advisorID) == 0: advisorID.append(0) if len(advisorID) == 1: advisorID.append(0) if len(advisorID) == 2: advisorID.append(0) self.advisorID = advisorID studentsID1 = self.tree.xpath('//*[@id="paddingWrapper"]/table/tr/td[1]/a/@href') studentsID = list() for s in studentsID1: studentsID.append(int(str(s)[10:])) self.studentsID = studentsID studentsInst1 = self.tree.xpath('//*[@id="paddingWrapper"]/table/tr/td[2]/text()') studentsInst = list() for s in studentsInst1: studentsInst.append(str(s)) self.studentsInst = studentsInst studentsYear1 = self.tree.xpath('//*[@id="paddingWrapper"]/table/tr/td[3]/text()') studentsYear = list() for s in studentsYear1: studentsYear.append(s) self.studentsYear = studentsYear class mathDB: def __init__(self, db_file): self.db_file = db_file self.init_db() def createConnection(self): """ create a database connection to a SQLite database """ conn = None try: conn = sqlite3.connect(self.db_file) #print(sqlite3.version) except Error as e: print(e) finally: if conn: return(conn) def create_table(self, conn, create_table_sql): """ create a table from the create_table_sql statement :param conn: Connection object :param create_table_sql: a CREATE TABLE statement :return: """ try: c = conn.cursor() c.execute(create_table_sql) except Error as e: print(e) def init_db(self): conn = self.createConnection() sql_create_main_table = """CREATE TABLE IF NOT EXISTS mathematicians ( id integer PRIMARY KEY, name text NOT NULL, title text, institute text, year integer, thesis text, first_advisor integer, second_advisor integer, third_advisor integer );""" if conn is not None: self.create_table(conn, sql_create_main_table) conn.close() else: print("Cannot create database connection") def insert_person(self, mathPage): conn = self.createConnection() cur = conn.cursor() row = self.get_person(mathPage.id) if len(row) == 0: cur.execute("INSERT INTO mathematicians VALUES (?,?,?,?,?,?,?,?,?)", mathPage.getEntry()) else: cur.execute(f""" UPDATE mathematicians SET name = '{mathPage.name}', title = '{mathPage.title}', institute = '{mathPage.inst}', year = '{mathPage.year}', thesis = '{mathPage.diss}', first_advisor = '{mathPage.advisorID[0]}', second_advisor = '{mathPage.advisorID[1]}', third_advisor = '{mathPage.advisorID[2]}' WHERE id = {mathPage.id} """) conn.commit() conn.close() def get_person(self, id): conn = self.createConnection() cur = conn.cursor() res = cur.execute(f"SELECT * FROM mathematicians WHERE id = {id}") res = cur.fetchall() conn.close() return res def populate_db(self, limit, chunk = 10): if type(limit) == int: limit = range(1, limit+1) try: l = len(limit) except TypeError as e: print("Wrong parameter. Limit must be an iterable object or an integer") n = l // chunk r = l % chunk for i in range(n): threads = list() persons = list() for j in range(chunk): persons.append(mathPage(limit[i*chunk+j])) # print(f"Creating MathID {limit[i*chunk+j]}") thread = threading.Thread(target = persons[j].getInfo) threads.append(thread) thread.start() for j, thread in enumerate(threads): thread.join() for j in range(chunk): print(f"Adding MathID {limit[i*chunk+j]} to database. Entry {i*chunk+j+1} of {l}.", end= '\r') self.insert_person(persons[j]) threads = list() persons = list() for j in range(r): persons.append(mathPage(limit[n*chunk+j])) # print(f"Creating MathID {limit[n*chunk+j]}") thread = threading.Thread(target = persons[j].getInfo) threads.append(thread) thread.start() for j, thread in enumerate(threads): thread.join() for j in range(r): print(f"Adding MathID {limit[i*chunk+j]} to database. Entry {i*chunk+j+1} of {l}.", end= '\r') self.insert_person(persons[j]) # print(f"Downloading entry {i} of {limit}", end= '\r') # page = mathPage(i) # self.insert_person(page) class mathGenealogy: pass total_entries = 297377 # number of records as of 2 October 2023 if __name__ == "__main__": testDB = mathDB("test.db") test = mathPage(0)