Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
  • Loading branch information
judahrand committed Feb 23, 2022
2 parents 2cf4239 + e573db1 commit 6569a4e
Show file tree
Hide file tree
Showing 5 changed files with 39 additions and 41 deletions.
3 changes: 2 additions & 1 deletion docs/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
# import sys
# sys.path.insert(0, os.path.abspath('.'))
import datetime
import importlib.metadata


def setup(app):
Expand All @@ -25,7 +26,7 @@ def setup(app):
project = 'PipelineWise'
copyright = f'{datetime.datetime.now().year}, Wise Ltd.'
author = 'Wise'
version = '0.35.2'
version = importlib.metadata.version('pipelinewise')


# -- General configuration ---------------------------------------------------
Expand Down
62 changes: 29 additions & 33 deletions pipelinewise/cli/pipelinewise.py
Original file line number Diff line number Diff line change
Expand Up @@ -1271,30 +1271,21 @@ def run_tap(self):

except pidfile.AlreadyRunningError:
self.logger.error('Another instance of the tap is already running.')
utils.silentremove(cons_target_config)
utils.silentremove(tap_properties_fastsync)
utils.silentremove(tap_properties_singer)
sys.exit(1)
# Delete temp files if there is any
except commands.RunCommandException as exc:
self.logger.exception(exc)
utils.silentremove(cons_target_config)
utils.silentremove(tap_properties_fastsync)
utils.silentremove(tap_properties_singer)
self._print_tap_run_summary(self.STATUS_FAILED, start_time, datetime.now())
self.send_alert(message=f'{tap_id} tap failed', exc=exc)
sys.exit(1)
except Exception as exc:
utils.silentremove(cons_target_config)
utils.silentremove(tap_properties_fastsync)
utils.silentremove(tap_properties_singer)
self._print_tap_run_summary(self.STATUS_FAILED, start_time, datetime.now())
self.send_alert(message=f'{tap_id} tap failed', exc=exc)
raise exc

utils.silentremove(cons_target_config)
utils.silentremove(tap_properties_fastsync)
utils.silentremove(tap_properties_singer)
finally:
utils.silentremove(cons_target_config)
utils.silentremove(tap_properties_fastsync)
utils.silentremove(tap_properties_singer)
self._print_tap_run_summary(self.STATUS_SUCCESS, start_time, datetime.now())

# pylint: disable=unused-argument
Expand All @@ -1310,23 +1301,18 @@ def stop_tap(self, sig=None, frame=None):
try:
with open(pidfile_path, encoding='utf-8') as pidf:
pid = int(pidf.read())
pgid = os.getpgid(pid)
parent = psutil.Process(pid)

# Terminate child processes
child = parent.children()[-1]
self.logger.info('Sending SIGTERM to child pid %s...', child.pid)
child.terminate()
child.wait()

# Rename log files from running to terminated status
if self.tap_run_log_file:
tap_run_log_file_running = f'{self.tap_run_log_file}.running'
tap_run_log_file_terminated = f'{self.tap_run_log_file}.terminated'

if os.path.isfile(tap_run_log_file_running):
os.rename(tap_run_log_file_running, tap_run_log_file_terminated)

sys.exit(1)
# Terminate all the processes in the current process' process group.
for child in parent.children(recursive=True):
if os.getpgid(child.pid) == pgid:
self.logger.info('Sending SIGTERM to child pid %s...', child.pid)
child.terminate()
try:
child.wait(timeout=5)
except psutil.TimeoutExpired:
child.kill()

except ProcessLookupError:
self.logger.error(
Expand All @@ -1342,6 +1328,19 @@ def stop_tap(self, sig=None, frame=None):
)
sys.exit(1)

# Remove pidfile.
os.remove(pidfile_path)

# Rename log files from running to terminated status
if self.tap_run_log_file:
tap_run_log_file_running = f'{self.tap_run_log_file}.running'
tap_run_log_file_terminated = f'{self.tap_run_log_file}.terminated'

if os.path.isfile(tap_run_log_file_running):
os.rename(tap_run_log_file_running, tap_run_log_file_terminated)

sys.exit(1)

# pylint: disable=too-many-locals
def sync_tables(self):
"""
Expand Down Expand Up @@ -1453,20 +1452,17 @@ def sync_tables(self):

except pidfile.AlreadyRunningError:
self.logger.error('Another instance of the tap is already running.')
utils.silentremove(cons_target_config)
sys.exit(1)
# Delete temp file if there is any
except commands.RunCommandException as exc:
self.logger.exception(exc)
utils.silentremove(cons_target_config)
self.send_alert(message=f'Failed to sync tables in {tap_id} tap', exc=exc)
sys.exit(1)
except Exception as exc:
utils.silentremove(cons_target_config)
self.send_alert(message=f'Failed to sync tables in {tap_id} tap', exc=exc)
raise exc

utils.silentremove(cons_target_config)
finally:
utils.silentremove(cons_target_config)

def validate(self):
"""
Expand Down
12 changes: 6 additions & 6 deletions pipelinewise/fastsync/commons/tap_mysql.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ def query(self, query, conn=None, params=None, return_as_cursor=False, n_retry=1
conn = self.conn

try:
with conn as cur:
with conn.cursor() as cur:
cur.execute(query, params)

if return_as_cursor:
Expand Down Expand Up @@ -277,10 +277,10 @@ def get_table_columns(self, table_name, max_num=None, date_type='date'):
table_name = table_dict.get('table_name')

sql = f"""
SELECT column_name,
data_type,
column_type,
safe_sql_value
SELECT column_name AS column_name,
data_type AS data_type,
column_type AS column_type,
safe_sql_value AS safe_sql_value
FROM (SELECT column_name,
data_type,
column_type,
Expand Down Expand Up @@ -378,7 +378,7 @@ def copy_table(
)
export_batch_rows = self.connection_config['export_batch_rows']
exported_rows = 0
with self.conn_unbuffered as cur:
with self.conn_unbuffered.cursor() as cur:
cur.execute(sql)
gzip_splitter = split_gzip.open(
path,
Expand Down
1 change: 1 addition & 0 deletions scripts/publish_docs.sh
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ python3 -m venv ~/venv-doc
. ~/venv-doc/bin/activate
pip install --upgrade pip
pip install sphinx sphinx-rtd-theme
pip install -e .

# CD into docs, make them. If you're not using Sphinx, you'll probably
# have a different build script.
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
'argparse==1.4.0',
'tabulate==0.8.9',
'PyYAML==6.0',
'ansible==4.7.0',
'ansible-core==2.11.8',
'Jinja2==3.0.2',
'joblib==1.1.0',
'PyMySQL==0.7.11',
Expand Down

0 comments on commit 6569a4e

Please sign in to comment.