Source code for scrapers.teiid

# SPDX-License-Identifier: GPL-3.0+
# Inspired from the MARS project

from __future__ import unicode_literals

from datetime import datetime, timedelta
from time import sleep

import psycopg2

from estuary import log


[docs]class Teiid(object): """Abstracts interfacing with Teiid to simplify connections and queries.""" def __init__(self, host, port, username, password): """ Initialize the Teiid class. :param str host: the Teiid FQDN or IP address :param int port: the port to connect to Teiid with :param str username: the username to connect to Teiid with :param str password: the password to connect to Teiid with """ self.host = host self.port = port self.username = username self.password = password # a dict mapping db names to cursors self._connections = {} self._last_query_dt = None
[docs] def get_connection(self, db_name, force_new=False, retry=None): """ Return an existing psycopg2 connection and establish it if needed. :param str db_name: the database name to get a connection to :kwarg bool force_new: forces a new database connection even if one already exists :kwarg int retry: the number of times to retry a failed connection. If this is not set, then the Teiid connection attempt will be repeated until it is successful. :return: a connection to Teiid :rtype: psycopg2 connection """ if not force_new and db_name in self._connections: return self._connections[db_name] if retry is not None and retry < 1: raise ValueError('The retry keyword must contain a value greater than 0') log.debug('Connecting to Teiid host {0}:{1}'.format(self.host, self.port)) attempts = 0 while True: attempts += 1 try: conn = psycopg2.connect( database=db_name, host=self.host, port=str(self.port), user=self.username, password=self.password, connect_timeout=300 ) break except (psycopg2.OperationalError, psycopg2.InternalError) as e: if retry and attempts > retry: raise else: log.exception(e) log.warning('The Teiid connection failed on attempt {0}. Sleeping for 60 ' 'seconds.'.format(attempts)) sleep(60) # Teiid does not support setting this value at all and unless we # specify ISOLATION_LEVEL_AUTOCOMMIT (zero), psycopg2 will send a # SET command to the Teiid server doesn't understand. conn.set_isolation_level(0) self._connections[db_name] = conn return conn
[docs] def query(self, sql, db='public', retry=None): """ Send the SQL query to Teiid and return the rows as a list. :param str sql: the SQL query to send to the database :kwarg str db: the database name to query on :kwarg int retry: the number of times to retry a failed query. If this is not set, then the Teiid query will be repeated until it is successful. :return: a list of rows from Teiid. Each row is a dictionary with the column headers as the keys. :rtype: list """ con = self.get_connection(db) cursor = con.cursor() if retry is not None and retry < 1: raise ValueError('The retry keyword must contain a value greater than 0') if self._last_query_dt: now = datetime.utcnow() now_and_last_diff = now - self._last_query_dt if now_and_last_diff < timedelta(seconds=0.5): sleep(now_and_last_diff.total_seconds()) log.debug('Querying Teiid DB "{0}" with SQL:\n{1}'.format(db, sql)) fifteen_mins = 15 * 60 backoff = 30 attempts = 0 while True: attempts += 1 try: if attempts > 1: # Restart the database connection after failed queries con = self.get_connection(db, force_new=True) cursor = con.cursor() cursor.execute(sql) self._last_query_dt = datetime.utcnow() break except (psycopg2.OperationalError, psycopg2.InternalError) as e: if retry and attempts > retry: raise # Raise an exception if the query failed due to a syntax error elif 'Parsing error' in e.pgerror: raise else: log.exception(e) if backoff < fifteen_mins: # Double the backoff time backoff = backoff * 2 elif backoff > fifteen_mins: # Max out the backoff time to 15 minutes backoff = fifteen_mins log.warning('The Teiid query failed on attempt {0}. Sleeping for {1} seconds.' .format(attempts, backoff)) sleep(backoff) data = cursor.fetchall() # column header names cols = [t[0] for t in cursor.description or []] log.debug('Found the following columns: {}'.format(cols)) log.debug('Received {} rows from Teiid'.format(len(data))) # build a return array with all columns return [dict(zip(cols, row)) for row in data]