Skip to content

Commit

Permalink
added PostgreSQL I/O
Browse files Browse the repository at this point in the history
  • Loading branch information
rbolgaryn committed Jul 18, 2022
1 parent 10717fa commit 4516a37
Show file tree
Hide file tree
Showing 5 changed files with 201 additions and 1 deletion.
1 change: 1 addition & 0 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ Change Log
- [ADDED] parameter in_ka for rated switch current
- [ADDED] current and loading result for switches
- [FIXED] bug for disabled continous tap controllers
- [ADDED] File I/O download and upload pandapowerNet to PostgreSQL

[2.9.0]- 2022-03-23
----------------------
Expand Down
61 changes: 61 additions & 0 deletions pandapower/file_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
import os
import pickle
from warnings import warn
import psycopg2
import psycopg2.errors

import numpy
import pandas as pd
Expand All @@ -30,6 +32,14 @@
from pandapower.create import create_empty_network
import pandapower.io_utils as io_utils

try:
import pandaplan.core.pplog as logging
except ImportError:
import logging

logger = logging.getLogger(__name__)


def to_pickle(net, filename):
"""
Saves a pandapower Network with the pickle library.
Expand Down Expand Up @@ -407,3 +417,54 @@ def from_sqlite(filename, netname=""):
net = from_sql(con)
con.close()
return net


def from_postgresql(schema, host, user, password, database, include_results=False, **id_columns):
# id_columns: {id_column_1: id_value_1, id_column_2: id_value_2}
net = create_empty_network()

conn = psycopg2.connect(host=host, user=user, password=password, database=database)
cursor = conn.cursor()
try:
for element, element_table in net.items():
if not isinstance(element_table, pd.DataFrame) or (element.startswith("res_") and not include_results):
continue
table_name = element if schema is None else f"{schema}.{element}"

try:
tab = io_utils.download_sql_table(cursor, table_name, **id_columns)
except UserWarning as err:
logger.debug(err)
continue
except psycopg2.errors.UndefinedTable as err:
logger.info(f"skipped {element} due to error: {err}")
continue
except psycopg2.errors.UndefinedColumn as err:
conn = psycopg2.connect(host=host, user=user, password=password, database=database)
cursor = conn.cursor()
logger.info(f"retrying {element} without id_columns")
tab = io_utils.download_sql_table(cursor, table_name)

if not tab.empty:
# preserve dtypes
columns = [c for c in element_table.columns if c in tab.columns]
tab[columns] = tab[columns].astype(element_table[columns].dtypes)
net[element] = pd.concat([element_table, tab])
logger.debug(f"downloaded table {element}")
finally:
conn.close()
return net


def to_postgresql(net, host, user, password, database, schema, include_results=False, **id_columns):
logger.info(f"Uploading the grid data to the DB schema {schema}")
with psycopg2.connect(host=host, user=user, password=password, database=database) as conn:
cursor = conn.cursor()
for element, element_table in net.items():
if not isinstance(element_table, pd.DataFrame) or net[element].empty or \
(element.startswith("res_") and not include_results):
continue
table_name = element if schema is None else f"{schema}.{element}"
io_utils.upload_sql_table(conn, cursor, table_name, element_table, **id_columns)
logger.debug(f"uploaded table {element}")

113 changes: 113 additions & 0 deletions pandapower/io_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@
from warnings import warn
import numpy as np

import psycopg2
import psycopg2.errors
import psycopg2.extras

import networkx
import numpy
import pandas as pd
Expand Down Expand Up @@ -88,6 +92,115 @@
logger = logging.getLogger(__name__)


def match_sql_type(dtype):
if dtype in ("float", "float32", "float64"):
return "double precision"
elif dtype in ("int", "int32", "int64", "uint32", "uint64"):
return "integer"
elif dtype in ("object", "str"):
return "varchar"
elif dtype == "bool":
return "boolean"
else:
raise UserWarning(f"unsupported type {dtype}")


def check_if_sql_table_exists(cursor, table_name):
query = f"SELECT EXISTS (SELECT FROM information_schema.tables " \
f"WHERE table_schema = '{table_name.split('.')[0]}' " \
f"AND table_name = '{table_name.split('.')[-1]}');"
cursor.execute(query)
(exists,) = cursor.fetchone()
return exists


def get_sql_table_columns(cursor, table_name):
query = f"SELECT * FROM information_schema.columns " \
f"WHERE table_schema = '{table_name.split('.')[0]}' " \
f"AND table_name = '{table_name.split('.')[-1]}';"
cursor.execute(query)
colnames = [desc[0] for desc in cursor.description]
list_idx = colnames.index("column_name")
columns_data = cursor.fetchall()
columns = [c[list_idx] for c in columns_data]
return columns


def download_sql_table(cursor, table_name, **id_columns):
# first we check if table exists:
exists = check_if_sql_table_exists(cursor, table_name)
if not exists:
raise UserWarning(f"table {table_name} does not exist or the user has no access to it")

if len(id_columns.keys()) == 0:
query = f"SELECT * FROM {table_name}"
else:
columns_string = ' and '.join([f"{str(k)} = '{str(v)}'" for k, v in id_columns.items()])
query = f"SELECT * FROM {table_name} WHERE {columns_string}"

cursor.execute(query)
colnames = [desc[0] for desc in cursor.description]
table = cursor.fetchall()
df = pd.DataFrame(table, columns=colnames)
index_name = f"{table_name.split('.')[-1]}_id"
if index_name in df.columns:
df.set_index(index_name, inplace=True)
if len(id_columns) > 0:
df.drop(id_columns.keys(), axis=1, inplace=True)
return df


def upload_sql_table(conn, cursor, table_name, table, create_new=True, **id_columns):
# Create a list of tupples from the dataframe values
if len(id_columns.keys()) > 0:
tuples = [(*tuple(x), *id_columns.values()) for x in table.itertuples(index=True)]
else:
tuples = [tuple(x) for x in table.itertuples(index=True)]

# Comma-separated dataframe columns
index_name = f"{table_name.split('.')[-1]}_id"
sql_columns = [index_name, *table.columns, *id_columns.keys()]
sql_column_types = [match_sql_type(str(table.index.dtype)),
*[match_sql_type(t) for t in table.dtypes.astype(str).values],
*[match_sql_type(np.result_type(type(v)).name) for v in id_columns.values()]]
placeholders = '%s,' * (len(table.columns) + len(id_columns.keys())) + '%s' # index is a +1

if create_new and not check_if_sql_table_exists(cursor, table_name):
# create_table_if_not_exists(conn, cursor, table_name, tab.dtypes, ["not null" for _ in sql_columns], schema)
# create_table_if_not_exists(conn, cursor, table_name, tab.dtypes, "not null", schema)
create_sql_table_if_not_exists(conn, cursor, table_name, sql_columns, sql_column_types, None)
logger.info(f"created new table {table_name}")

# check if all columns already exist and if not, add more columns
existing_columns = get_sql_table_columns(cursor, table_name)
new_columns = [(c, t) for c, t in zip(sql_columns, sql_column_types) if c not in existing_columns]
if len(new_columns) > 0:
logger.info(f"adding columns {new_columns} to table {table_name}")
column_statement = ", ".join(f"ADD COLUMN {c} {t}" for c, t in new_columns)
query = f"ALTER TABLE {table_name} {column_statement};"
cursor.execute(query)

# SQL query to execute
query = f"INSERT INTO {table_name}({','.join(sql_columns)}) VALUES({placeholders})"
# batch_size = 1000
# for chunk in tqdm(chunked(tuples, batch_size)):
# cursor.executemany(query, chunk)
# conn.commit()
psycopg2.extras.execute_batch(cursor, query, tuples, page_size=100)
conn.commit()


def create_sql_table_if_not_exists(conn, cursor, table_name, sql_columns, sql_colum_types, constraints):
if constraints is not None:
cols_statement = ", ".join([" ".join(t) for t in list(zip(sql_columns, sql_colum_types, constraints))])
else:
cols_statement = ", ".join([" ".join(t) for t in list(zip(sql_columns, sql_colum_types))])

query = f"CREATE TABLE IF NOT EXISTS {table_name}({cols_statement});"
cursor.execute(query)
conn.commit()


def coords_to_df(value, geotype="line"):
columns = ["x", "y", "coords"] if geotype == "bus" else ["coords"]
geo = pd.DataFrame(columns=columns, index=value.index)
Expand Down
25 changes: 25 additions & 0 deletions pandapower/test/api/test_file_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
from pandapower.test.toolbox import assert_net_equal, create_test_network, create_test_network2
from pandapower.timeseries import DFData

import testing.postgresql

try:
import geopandas as gpd
GEOPANDAS_INSTALLED = True
Expand Down Expand Up @@ -427,5 +429,28 @@ def test_json_io_with_characteristics(net_in):
assert np.isclose(net_out.characteristic.object.at[c2.index](2.5), c2(2.5), rtol=0, atol=1e-12)


def test_postgresql(net_in):
# Lanuch new PostgreSQL server
with testing.postgresql.Postgresql() as postgresql:
connect_data = postgresql.dsn()
# net_in = pp.networks.mv_oberrhein()
# net_in.switch["in_ka"] = np.nan
# connect_data = {"host": "localhost",
# "user": "test",
# "database": "sandbox",
# "password": "secret"}
id_columns = {"grid_id": 123, "another_id": "another_id_val"}
pp.to_postgresql(net_in, schema="robustplan", include_results=True, **connect_data, **id_columns)

net_out = pp.from_postgresql(schema="robustplan", include_results=True, **connect_data, **id_columns)

for element, table in net_in.items():
# dictionaries (e.g. std_type) not included
# json serialization/deserialization of objects not implemented
if not isinstance(table, pd.DataFrame) or table.empty or element == "line_geodata":
continue
assert pp.dataframes_equal(table, net_out[element]), element


if __name__ == "__main__":
pytest.main([__file__, "-s"])
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@
"plotting": ["plotly", "matplotlib", "python-igraph", "geopandas"],
# "shapely", "pyproj" are depedencies of geopandas and so already available;
# "base64", "hashlib", "zlib" produce installing problems, so they are not included
"test": ["pytest", "pytest-xdist"],
"test": ["pytest", "pytest-xdist", "testing.postgresql"],
"performance": ["ortools"], # , "lightsim2grid"],
"fileio": ["xlsxwriter", "openpyxl", "cryptography", "geopandas"],
# "fiona" is a depedency of geopandas and so already available
Expand Down

0 comments on commit 4516a37

Please sign in to comment.