Skip to content

Commit

Permalink
feat: add create_table function to facilitate dynamic table creation …
Browse files Browse the repository at this point in the history
…in the database
  • Loading branch information
RichieHakim committed Nov 23, 2024
1 parent ccb20db commit 60533bc
Showing 1 changed file with 116 additions and 15 deletions.
131 changes: 116 additions & 15 deletions bnpm/sql_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -397,6 +397,60 @@ def drop_database(
connection.execute(sqlalchemy.text(query))


def create_table(
url: str,
table: str,
columns: Dict[str, str],
database: Optional[str] = None,
kwargs_connection: Optional[Dict[str, Any]] = {},
) -> None:
"""
Creates a new table in the database.
RH 2024
Args:
url (str):
URL for the database connection
table (str):
Name of the table to create
columns (dict):
Dictionary of column names and types. Example: {"column1": "INT",
"column2": "VARCHAR(255)"}
database (str):
Name of the database. If None, then the default database is used.
kwargs_connection (dict):
Additional keyword arguments to be passed to sqlalchemy.create_engine
"""
engine = sqlalchemy.create_engine(url, **kwargs_connection)
if "mysql" in str(engine):
if database is None:
raise ValueError("database must be specified for MySQL")
query = f"CREATE TABLE {database}.{table} ("
for column, column_type in columns.items():
query += f"{column} {column_type}, "
query = query[:-2] + ")"
elif "postgresql" in str(engine):
if database is None:
query = f"CREATE TABLE {table} ("
else:
query = f"CREATE TABLE {database}.{table} ("
for column, column_type in columns.items():
query += f"{column} {column_type}, "
query = query[:-2] + ")"
elif "sqlite" in str(engine):
if database is not None:
raise ValueError("database must be None for SQLite")
query = f"CREATE TABLE {table} ("
for column, column_type in columns.items():
query += f"{column} {column_type}, "
query = query[:-2] + ")"
else:
raise ValueError("Connection type not recognized")

with engine.begin() as connection:
connection.execute(sqlalchemy.text(query))


def drop_table(
url: str,
table: str,
Expand All @@ -418,24 +472,71 @@ def drop_table(
Additional keyword arguments to be passed to sqlalchemy.create_engine
"""
engine = sqlalchemy.create_engine(url, **kwargs_connection)
if isinstance(connection, sqlalchemy.engine.base.Connection):
if "mysql" in str(engine):
if database is None:
raise ValueError("database must be specified for MySQL")
query = f"DROP TABLE {database}.{table}"
elif "postgresql" in str(engine):
if database is None:
query = f"DROP TABLE {table}"
else:
query = f"DROP TABLE {database}.{table}"
elif "sqlite" in str(engine):
if database is not None:
raise ValueError("database must be None for SQLite")
if "mysql" in str(engine):
if database is None:
raise ValueError("database must be specified for MySQL")
query = f"DROP TABLE {database}.{table}"
elif "postgresql" in str(engine):
if database is None:
query = f"DROP TABLE {table}"
else:
raise ValueError("Connection type not recognized")
query = f"DROP TABLE {database}.{table}"
elif "sqlite" in str(engine):
if database is not None:
raise ValueError("database must be None for SQLite")
query = f"DROP TABLE {table}"
else:
raise TypeError("connection must be a sqlalchemy.engine.base.Connection object")
raise ValueError("Connection type not recognized")

with engine.begin() as connection:
connection.execute(sqlalchemy.text(query))


# def insert_data(
# url: str,
# table: str,
# data: Union[Dict[str, Any], List[Dict[str, Any]]],
# database: Optional[str] = None,
# kwargs_connection: Optional[Dict[str, Any]] = {},
# ) -> None:
# """
# Inserts data into the table.
# RH 2024

# Args:
# url (str):
# URL for the database connection
# table (str):
# Name of the table to insert data into
# data (dict or list of dict):
# Data to insert. If a single dictionary, then a single row is inserted.
# If a list of dictionaries, then multiple rows are inserted.
# database (str):
# Name of the database. If None, then the default database is used.
# kwargs_connection (dict):
# Additional keyword arguments to be passed to sqlalchemy.create_engine
# """
# engine = sqlalchemy.create_engine(url, **kwargs_connection)
# if "mysql" in str(engine):
# if database is None:
# raise ValueError("database must be specified for MySQL")
# if isinstance(data, dict):
# query = f"INSERT INTO {database}.{table} ({', '.join(data.keys())}) VALUES ({', '.join([str(value) for value in data.values()])})"
# elif isinstance(data, list):
# query = f"INSERT INTO {database}.{table} ({', '.join(data[0].keys())}) VALUES "
# for row in data:
# query += f"({', '.join([str(value) for value in row.values()])}), "
# query = query[:-2]
# elif "postgresql" in str(engine):
# if database is None:
# if isinstance(data, dict):
# query = f"INSERT INTO {table} ({', '.join(data.keys())}) VALUES ({', '.join([str(value) for value in data.values()])})"
# elif isinstance(data, list):
# query = f"INSERT INTO {table} ({', '.join(data[0].keys())}) VALUES "
# for row in data:
# query += f"({', '.join([str(value) for value in row.values()])}), "
# query = query[:-2]
# else:
# if isinstance(data, dict):
# query = f"INSERT INTO {database}.{table} ({', '.join(data.keys())}) VALUES ({', '.join([str(value) for value in data.values()])})"
# elif

0 comments on commit 60533bc

Please sign in to comment.