diff --git a/MathGen.py b/MathGen.py index 8378e6f..f5a1d3a 100644 --- a/MathGen.py +++ b/MathGen.py @@ -7,16 +7,20 @@ import sqlite3 from sqlite3 import Error import json import re +import webcolors as webc import logging +from subprocess import call + + #logging.warning('Watch out!') # will print a message to the console #logging.info('I told you so') # will not print anything class mathPage: - def __init__(self, mathID): + def __init__(self, mathID, verbose=False): """ Create an empty object only containing the ID - """ + """ self.id = mathID self.mirror = "http://mathgenealogy.org/" self.url = self.mirror+"id.php?id="+str(mathID) @@ -36,7 +40,8 @@ class mathPage: def get_info(self): - logging.info(f'Downloading info to mathID {self.id} from the online database.') + # logging.info(f'Downloading info to mathID {self.id} from the online database.') + print(f'Downloading info to mathID {self.id} from the online database.', end='\r') temp = requests.get(self.url) temp.encoding = "utf-8" self.page = temp.text @@ -46,6 +51,7 @@ class mathPage: self.inst = None self.diss = None self.advisorID = [None, None, None] + else: self.tree = html.fromstring(self.page) self.parse_page() @@ -185,6 +191,10 @@ class mathDB: conn = connection cur = conn.cursor() return(cur, conn) + + def kill_connection(self, conn, connection=None): + if connection == None: + conn.close() @@ -245,6 +255,45 @@ class mathDB: self.insert_person(p, connection=connection) except ValueError: print(f"Error: No entry with mathID {mathID} exists.") + + def insert_new_person(self, name, inst, year, diss, advisors, title, students={}, connection=None): + cur,conn = self.get_cursor(connection) + cur.execute("SELECT * FROM mathematicians ORDER BY id DESC LIMIT 1") + res = cur.fetchall() + lastID = res[0][0] + if lastID > int(1e6): + mathID = lastID+1 + else: + mathID = int(1e6) + if len(advisors) == 1: + advisors.append(0) + if len(advisors) == 2: + advisors.append(0) + entry = (mathID, name, title, inst, year, diss, advisors[0], advisors[1], advisors[2], json.dumps(students)) + entry_update = (name, title, inst, year, diss, advisors[0], advisors[1], advisors[2], json.dumps(students), mathID) + row = self.get_person(mathID, conn) + if row[0] == 0: + cur.execute("INSERT INTO mathematicians VALUES (?,?,?,?,?,?,?,?,?,?)", entry) + else: + cur.execute(""" + UPDATE mathematicians + SET name = ?, + title = ?, + institute = ?, + year = ?, + thesis = ?, + first_advisor = ?, + second_advisor = ?, + third_advisor = ?, + students = ? + WHERE id = ? + """, (entry_update)) + conn.commit() + if connection == None: + conn.close() + return(mathID) + + @@ -313,10 +362,10 @@ class mathDB: print(f"Error: No entry with mathID {mathID} exists.") p = self.get_person(mathID, connection=conn) students = json.loads(p[9]) - return(students["MathID"]) if connection == None: conn.close() - + return(students["MathID"]) + @@ -395,7 +444,7 @@ class mathDB: if connection == None: conn.close() if len(res) == 0: - return((0,0,0,0,0,0,0,0,0,0)) + return((0,0,0,0,0,0,0,0,0,'{}')) else: return(res[0]) @@ -465,13 +514,68 @@ class mathDB: if connection == None: conn.close() - def add_decendants(self, mathID, depth=0, connection=None): - pass + + + def get_descendants(self, mathID, depth=0, connection=None): + cur,conn = self.get_cursor(connection) + i = 0 + desc = {mathID} + desc_new = {mathID} + if not self.exists(mathID): + self.add_person(mathID) + while True: + desc_temp = set() + if i > depth and not depth == 0: + break + for d in desc_new: + p = self.get_person(d, connection=conn) + s = json.loads(p[9]) + if "MathID" in s.keys(): + students = s["MathID"] + desc_temp = desc_temp | set(students) + desc = desc | desc_new + desc_new = desc_temp + if len(desc_new) == 0: + break + i += 1 + if connection == None: + conn.close() + return(desc) + + def add_descendants(self, mathID, depth=0, connection=None): + cur,conn = self.get_cursor(connection) + i = 0 + desc = {mathID} + desc_new = {mathID} + if not self.exists(mathID): + self.add_person(mathID) + while True: + fetch = [] + desc_temp = set() + if i >= depth and not depth == 0: + break + for d in desc_new: + p = self.get_person(d, connection=conn) + s = json.loads(p[9]) + if "MathID" in s.keys(): + students = s["MathID"] + desc_temp = desc_temp | set(students) + desc = desc | desc_new + desc_new = desc_temp + if len(desc_new) == 0: + break + for d in desc_new: + if not self.exists(d): + fetch.append(d) + self.populate_db(fetch, connection=connection) + i += 1 + if connection == None: + conn.close() - def populate_db(self, limit, chunk = 10): - conn = sqlite3.connect(self.db_file) + def populate_db(self, limit, chunk = 10, connection=None): + cur,conn = self.get_cursor(connection) if type(limit) == int: limit = range(1, limit+1) try: @@ -509,58 +613,411 @@ class mathDB: self.insert_person(persons[j], conn) # print(f"Downloading entry {i} of {limit}", end= '\r') # page = mathPage(i) -# self.insert_person(page) - conn.close() +# self.insert_person(page) + if connection == None: + conn.close() class mathGenealogy(Graph): - def __init__(self, DB="MathGen.db", vertices = None, directed=True): + def __init__(self, DB = "MathGen.db", name = "graph", vertices = None, directed=True): self.db = mathDB(DB) + self.table_name = name + self.db_file = DB + self.init_db(name) + self.roots = [] super().__init__(directed=directed) self.vs["name"] = "" + self.__rank = True + self.__same_level = [] + self.__graphOptions = {} + self.__fontsize = "12" + self.__fontlarge = "14" + self.__edgeOptions = {} + self.__nodeOptions = {} + self.__graphOptions['ratio'] = 'auto' + self.__graphOptions['splines'] = 'spline' + self.__graphOptions['fontname'] = 'helvetica' + self.__graphOptions['bgcolor'] = 'white' + self.__graphOptions['ranksep'] = "0.8" + self.__graphOptions['newrank'] = "true" + self.__graphOptions['mclimit'] = "100" + self.__graphOptions['rankdir'] = 'TB' + self.__graphOptions['minlen'] = "0.1" + self.__graphOptions['nslimit'] = "10" + self.__nodeOptions["shape"] = "box" + self.__nodeOptions["rank"] = "same" + self.__nodeOptions["style"] = "rounded, filled" + self.__edgeOptions["penwidth"] = "2" + self.__edgeOptions["arrowhead"] = "vee" - def add_person(self, mathID, root=0, line=0, force=False): + def fixed_level(self, vIDs): + if type(vIDs) == int: + self.__same_level.append(vIDs) + else: + self.__same_level += vIDs + + + def config_graph(self, ratio="auto", mclimit="100", + splines="spline", ranksep="1", fontname="helvetica", + bgcolor="#fffff0", rank=True): + self.__graphOptions['ratio'] = ratio + self.__graphOptions['splines'] = splines + self.__graphOptions['fontname'] = fontname + self.__graphOptions['bgcolor'] = bgcolor + self.__graphOptions['ranksep'] = ranksep + self.__graphOptions['mclimit'] = mclimit + self.__rank = rank + + + def config_nodes(self, style="rounded, filled", shape="box", rank="same"): + self.__nodeOptions["shape"] = shape + self.__nodeOptions["rank"] = rank + self.__nodeOptions["style"] = style + + + + def config_edges(self, penwidth="2", arrowhead="vee"): + self.__edgeOptions["penwidth"] = penwidth + self.__edgeOptions["arrowhead"] = arrowhead + + + + + def create_connection(self): + """ create a database connection to a SQLite database """ + conn = None + try: + conn = sqlite3.connect(self.db_file) + #print(sqlite3.version) + except Error as e: + print(e) + finally: + if conn: + return(conn) + + + + # def create_table(self, conn, create_table_sql): + # """ create a table from the create_table_sql statement + # :param conn: Connection object + # :param create_table_sql: a CREATE TABLE statement + # :return: + # """ + # try: + # c = conn.cursor() + # c.execute(create_table_sql) + # except Error as e: + # print(e) + + def vID_to_mathID(self, vID, connection=None): + cur,conn = self.db.get_cursor(connection) + if type(vID) == int: + cur.execute(f"SELECT mathID FROM {self.table_name} WHERE vID = ?", (vID,)) + res = cur.fetchall() + if len(res) > 0: + return(res[0][0]) + else: + return(0) + elif type(vID) == list: + res = [] + for i in vID: + res.append(self.vID_to_mathID(i,conn)) + return(res) + else: + return(0) + + + + def mathID_to_vID(self, mathID, connection=None): + cur,conn = self.db.get_cursor(connection) + cur.execute(f"SELECT vID FROM {self.table_name} WHERE mathID = ?", (mathID,)) + res = cur.fetchall() + if len(res) > 0: + return(res[0][0]) + else: + return(0) + + + def get_person(self, vID, connection=None): + cur,conn = self.db.get_cursor(connection) + res = cur.execute(f"SELECT * FROM {self.table_name} WHERE vID = ?", (vID,)) + res = cur.fetchall() + if connection == None: + conn.close() + if len(res) == 0: + return((None,0,'{}')) + else: + return(res[0]) + + + def init_db(self, name, connection=None): + cur,conn = self.db.get_cursor(connection) + sql_create_graph_table = f"""CREATE TABLE IF NOT EXISTS {name} ( + vID integer PRIMARY KEY, + mathID integer NOT NULL, + meta text + );""" + if conn is not None: + cur.execute(f"DROP TABLE IF EXISTS {self.table_name}") + conn.commit() + self.db.create_table(conn, sql_create_graph_table) + conn.close() + else: + print("Cannot create database connection") + + + def insert_db_entry(self, mathID, vID, meta, connection = None): + cur,conn = self.db.get_cursor(connection) + row = self.get_person(vID, conn) + meta_text = json.dumps(meta) + if row[0] == None: + cur.execute(f"INSERT INTO {self.table_name} VALUES (?,?,?)", (vID, mathID, meta_text)) + else: + new_meta = {} + old_meta = json.loads(self.get_person(vID, conn)[2]) + new_meta["roots"] = list(set(old_meta["roots"] + meta["roots"])) + meta_text = json.dumps(new_meta) + cur.execute(f""" + UPDATE {self.table_name} + SET mathID = ?, + meta = ? + WHERE vID = ? + """, (mathID, meta_text, vID)) + conn.commit() + if connection == None: + conn.close() + + def get_advisors(self, vID, connection=None): + cur,conn = self.db.get_cursor(connection) + query = f"""SELECT mathematicians.first_advisor, + mathematicians.second_advisor, + mathematicians.third_advisor + FROM mathematicians + JOIN {self.table_name} + ON mathematicians.id = {self.table_name}.mathID + WHERE vID = ?""" + cur.execute(query, (vID,)) + res = cur.fetchall() + if len(res) > 0: + res = res[0] + else: + return(tuple()) + query = f"""SELECT vID FROM {self.table_name} WHERE mathID = ?""" + adv = [] + for a in res: + if a > 0: + cur.execute(query, (a,)) + advID = cur.fetchall()[0][0] + adv.append(advID) + return(tuple(adv)) + + + def get_students(self, vID, connection=None): + cur,conn = self.db.get_cursor(connection) + query = f""" + SELECT vID + FROM {self.table_name} + JOIN mathematicians + ON mathematicians.id = graph.mathID + WHERE first_advisor = (SELECT mathID FROM graph WHERE vID = ?) + OR second_advisor = (SELECT mathID FROM graph WHERE vID = ?) + OR third_advisor = (SELECT mathID FROM graph WHERE vID = ?)""" + cur.execute(query, (vID,vID,vID)) + res = cur.fetchall() + stud = [] + for s in res: + stud.append(s[0]) + return(stud) + + def add_new_link(self, vOUT, vIN): + if self.get_eid(vOUT, vIN, error=False) < 0: + self.add_edge(vOUT, vIN) + + + + def add_links(self, vID): + adv = self.get_advisors(vID) + stud = self.get_students(vID) + for a in adv: + self.add_new_link(a, vID) + for s in stud: + self.add_new_link(vID,s) + + def add_all_links(self): + for vID in range(len(self.vs)): + self.add_links(vID) + + + def add_person(self, mathID, root=None, line=None, force=False): person = self.db.get_person(mathID) + if root != None and root not in self.roots: + self.roots.append(root) if person[0] == 0: - p = mathPage(mathID) - p.get_info() - entry = p.get_entry() - if entry[1] == None: - raise ValueError("No such entry in the math genealogy project found.") - return - else: - self.db.insert_person(p) - person = self.db.get_person(mathID) + self.db.add_person(mathID) + # p = mathPage(mathID) + # p.get_info() + # entry = p.get_entry() + # if entry[1] == None: + # raise ValueError("No such entry in the math genealogy project found.") + # return + # else: + # self.db.insert_person(p) + person = self.db.get_person(mathID) if str(mathID) not in self.vs['name']: vID = self.add_vertex(str(mathID)).index + self.add_links(vID) self._insert_person(person, vID, root, line) + else: + vID = self.vs.find(name=str(mathID)).index + self.add_links(vID) + if force: + self._insert_person(person, vID, root, line) + else: + pass +# self.vs[vID]["roots"].append(root) + self.insert_db_entry(mathID, vID, {"roots": [root]}) + - elif force: - vID = graph.vs.find(name=str(mathID)).index - self._insert_person(person, vID, root, line) + def add_ancestors(self, mathID, depth=0, root=None, line=None): + self.db.add_ancestors(mathID, depth=depth) + ancestors = self.db.get_ancestors(mathID, depth=depth) + for a in ancestors: + self.add_person(a, root=mathID) + + + def add_descendants(self, mathID, depth=1, root=None, line=None): + self.db.add_descendants(mathID, depth=depth) + descendants = self.db.get_descendants(mathID, depth=depth) + for d in descendants: + self.add_person(d, root=mathID) + + + def add_new_person(self, name, inst, year, diss, advisors, title, students={}, root=None): + mathID = self.db.insert_new_person(name, inst, year, diss, advisors, title, students) + self.add_person(mathID, root=root) + def _insert_person(self, person, vID, root, line): students = json.loads(person[9]) + if len(students) > 0: + students = students["MathID"] + self.vs[vID]["Name"] = person[1] self.vs[vID]["Title"] = person[2] - self.vs[vID]["Year"] = str(person[3]) - self.vs[vID]["Dissertation"] = person[4] - self.vs[vID]["Institution"] = person[5] - self.vs[vID]["Students"] = students["MathID"] - self.vs[vID]["Advisors"] = person[6:8] - self.vs[vID]["Line"] = line - self.vs[vID]["roots"] = [root] + self.vs[vID]["Institution"] = person[3] + self.vs[vID]["Year"] = str(person[4]) + self.vs[vID]["Dissertation"] = person[5] + self.vs[vID]["label"] = self.__make_nice_label(vID) + #self.vs[vID]["Students"] = students + #self.vs[vID]["Advisors"] = person[6:8] + + + def get_clusterID(self, vID, connection=None): + out = 0 + p = self.get_person(vID, connection) + roots = json.loads(p[2])["roots"] + if roots == [None]: + return(-1) + for r in roots: + out = out + 2**self.roots.index(r) + return(out-1) - + def get_clusters(self, connection=None): + alphabet = "ABCDEFGHIJKLMOPQRSTUVXYZ" + cluster_dict = {} + cluster = {} + i = 0 + for r in self.roots: + cluster_dict.update({alphabet[i]:[r]}) + i += 1 + for k,v in cluster_dict.items(): + cluster.update({k : [self.mathID_to_vID(v[0])]}) + for vID in range(len(self.vs)): + p = self.get_person(vID, connection) + roots = json.loads(p[2])["roots"] + if roots == [None]: + continue + new = True + for k,v in cluster_dict.items(): + if roots == v and vID not in cluster[k]: + cluster[k].append(vID) + new = False + break + if new: +# print(roots) + key = [] + for r in roots: +# print(r) + for k,v in cluster_dict.items(): +# print(k,v) + if len(k) == 1 and r in v: + key.append(k) +# print(key) + key.sort() + new_key = "".join(key) +# print(new_key) +# print("\n") + cluster_dict.update({new_key: roots}) + cluster.update({new_key: [vID]}) + return(cluster_dict, cluster) + + def get_cluster_list(self): + clusters = [] + for vID in range(len(self.vs)): + c = self.get_clusterID(vID) + self.vs[vID]["cluster"] = c + if c not in clusters: + clusters.append(c) + return(clusters) + + def __cycle_color_list(self, colors, length): + color_list = [] + col = [] + try: + for c in colors: + color_list.append(c) + except TypeError: + color_list.append(colors) + for i in range(0, length): + j = i % len(color_list) +# print(j) + col.append(color_list[j]) + return(col) + + def __get_color_CSS(self, color): + if color[0] == "#" : + col = color + else: + col = webc.name_to_hex(color, spec='css3') + return(col) + + def color_graph_CSS(self, colors): + rootCnt = len(self.roots) + clusters = self.get_cluster_list() + #print("Total number of clusters found: "+ str(len(clusters))) + col = self.__cycle_color_list(colors, len(clusters)) + for vID in range(len(self.vs)): + clusterID = self.get_clusterID(vID) + v_col = self.__get_color_CSS(col[clusters.index(clusterID)]) + self.vs[vID]["color"] = v_col + self.vs[vID]["fillcolor"] = v_col + if self._get_lum(v_col) < 0.5: + self.vs[vID]["fontcolor"] = "#ffffff" + else: + self.vs[vID]["fontcolor"] = "#000000" + + + + # if "name" in vertex.keys(): # self.add_vertex(vertex["name"]) # else: @@ -572,17 +1029,18 @@ class mathGenealogy(Graph): - def _wrap_string(self, string, wrap): + def __wrap_string(self, string, wrap): out='' while (len(string)>wrap) : helpString=string[:(wrap+1)] i=helpString.rfind(' ') out = out + helpString[:i] + '
' string=string[(i+1):] - out = out + string + '' + out = out + string + '' return(out) - def _wrap_institute(self, string): + + def __wrap_institute(self, string): out='' while (string.find(' and ')>=0) : i=string.find('and') @@ -592,40 +1050,157 @@ class mathGenealogy(Graph): return(out) - def _make_nice_label(self, vID): - label = "<" + self.vs[vID]["Name"] + "
" + # def __make_nice_label(self, vID): + # label = f"<{self.vs[vID]['Name']}
" + # diss = self.vs[vID]["Dissertation"] + # if diss and not diss.isspace(): + # line2 = self.__wrap_string(diss, 60) + "
" + # line2 = line2.replace("&", "&") + # label = label + line2 + # inst = self.vs[vID]["Institution"] + # if inst and not inst.isspace(): + # inst = self.__wrap_institute(inst) + # else: + # inst = 'Unknown' + # year = self.vs[vID]["Year"] + # if not year or year.isspace(): + # year = '?' + # label= label + inst + ", " + year + ">" + # return(label) + + def __make_nice_label(self, vID): + fn = "" + fl = "" + nameLine = "" + self.vs[vID]["Name"] + "" + instLine = "" + dissLine = "" diss = self.vs[vID]["Dissertation"] - if diss and not diss.isspace(): - line2 = self._wrap_string(diss, 60) + "
" - line2 = line2.replace("&", "&") - label = label + line2 inst = self.vs[vID]["Institution"] if inst and not inst.isspace(): - inst = self._wrap_institute(inst) - else: - inst = 'Unknown' + inst = self.__wrap_institute(inst) + instLine = inst year = self.vs[vID]["Year"] - if not year or year.isspace(): - year = '?' - label= label + inst + ", " + year + ">" + if year and not year.isspace(): + if instLine and not instLine.isspace(): + instLine = instLine + ", " + year + "" + else: + instLine = "" + year + "" + if instLine or not instLine.isspace(): + instLine = "
" + instLine + if diss and not diss.isspace(): + diss = self.__wrap_string(diss, 60) + dissLine = "
" + diss.replace("&", "&") + if instLine or dissLine: + label = "<" +fl + nameLine +"
" + fn + instLine + dissLine + "
" + ">" + else: + label = "<" +fl + nameLine +"" + ">" return(label) + + + def _hex2rgb(self, color): + if color[0] == '#': + r = int(color[1:3], 16)/255 + g = int(color[3:5], 16)/255 + b = int(color[5:7], 16)/255 + color = [r,g,b] + return(color) + + def _rgb2hex(self, color): + col = [] + colstr = [] + colout = '#' + for i in range(0,3): + col.append(hex(round(color[i]*255))) + if len(col[i]) >3 : + colout = colout + col[i][2:4] +# colstr.append(col[i][2:4]) + else : + colout = colout + '0' + col[i][2:3] +# colstr.append('0' + col[i][2:3]) + return(colout) + + def __blendColorsHex(self, col1, col2, t): + col1 = self.__hex2rgb(col1) + col2 = self.__hex2rgb(col2) + col = [] + for i in range(0,3): + col.append(math.sqrt((1-t) * pow(col1[i],2) + t * pow(col2[i], 2))) + colstring = self.__rgb2hex(col) + return(colstring) + + + def __formatOptions(self, opts): + out = "[" + optCnt = len(opts) + i=0 + for opt in opts: + out = out + opt + '="' + str(opts[opt]) + '"' + if (optCnt-1) > i : + out = out + ", " + i += 1 + out = out + "]" + return(out) + + + + def save(self, filename): +# Algorithm not optimal. Might redo labels although they already exist. + self.add_all_links() + for edge in self.es: + if not "style" in edge.attribute_names() or not edge["style"]: + edge["style"] = "solid" + +# self.__graphOptions["bgcolor"] = self.__getColCSS(self.__backColor) + for vID in range(len(self.vs)): +# if not self.vs[vxID].has_attr("label") or self.graph.vs[vxID]["label"].isspace(): + self.vs[vID]["label"] = self.__make_nice_label(vID) +# if not self.graph.vs[vxID].has_attr("rank") or self.graph.vs[vxID]["rank"].isspace(): + if self.vs[vID]["Year"] and not self.vs[vID]["Year"].isspace(): + self.vs[vID]["rank"] = self.vs[vID]["Year"] + self.write_dot(filename) + with open(filename, 'r') as file : + filedata = file.read() + optionString = self.__formatOptions(self.__graphOptions) + nodeOptionString = self.__formatOptions(self.__nodeOptions) + edgeOptionString = self.__formatOptions(self.__edgeOptions) + + rankString = "" + + if self.__rank: + rankString = ' {rank = "same";' + for r in self.__same_level: + rID = self.vs.find(name=str(r)).index + rankString = rankString + str(rID) +';' + rankString = rankString+'}' + + # Replace the target string + break1 = filedata.find('\n') +1 + break2 = filedata[break1:].find('\n') + filedata = filedata[:(break1+break2)] + " graph " + optionString + "\n node " + nodeOptionString + "\n edge " + edgeOptionString + '\n' + rankString + filedata[(break1+break2):] + filedata = filedata.replace('"<', '<') + filedata = filedata.replace('>"', '>') + # Write the file out again + with open(filename, 'w') as file: + file.write(filedata) + + def _get_lum(self, col): + rgb = self._hex2rgb(col) + lum = (0.2126*rgb[0] + 0.7152*rgb[1] + 0.0722*rgb[2]) + return(lum) - - -total_entries = 297377 # number of records as of 2 October 2023 - -Greven_ID = 29360 # Andreas Greven -Aumann_ID= 36548 # Georg Aumann -Anita_ID = 92324 # Anita -Fourier_ID = 17981 # Fourier -Wolfgang_ID = 150286 # Wolfgang -Eichelsbacher_ID = 27275 # Peter -Anton_ID = 125956 # Anton -Pfaffelhuber_ID = 157881 # Peter Pfaffelhuber -Ruess_ID = 75966 # Ruess - - -if __name__ == "__main__": - testDB = mathDB("test.db") - test = mathPage(0) + def draw_graph(self, filename="graph.pdf", engine="pdf", bgcolor="white", clean=True): + i = filename.rfind('.') + if i > 0: + fn = filename[:i] + else: + fn = filename + if bgcolor != "white": + self.__backColor = bgcolor + if not filename: + filename = "tmp_"+str(random.randint(1,9999999)) + self.save(fn+".dot") + call(["dot", f"-T{engine}", f"{fn}.dot", "-o", f"{fn}.{engine}"]) + if clean: + call(["rm", f"{fn}.dot"]) +