Viewing file: pg8000.py (8.21 KB) -rw-r--r-- Select action/file-type: (+) | (+) | (+) | Code (+) | Session (+) | (+) | SDB (+) | (+) | (+) | (+) | (+) | (+) |
# postgresql/pg8000.py # Copyright (C) 2005-2017 the SQLAlchemy authors and contributors <see AUTHORS # file> # # This module is part of SQLAlchemy and is released under # the MIT License: http://www.opensource.org/licenses/mit-license.php
""" .. dialect:: postgresql+pg8000 :name: pg8000 :dbapi: pg8000 :connectstring: \ postgresql+pg8000://user:password@host:port/dbname[?key=value&key=value...] :url: https://pythonhosted.org/pg8000/
.. _pg8000_unicode:
Unicode -------
pg8000 will encode / decode string values between it and the server using the PostgreSQL ``client_encoding`` parameter; by default this is the value in the ``postgresql.conf`` file, which often defaults to ``SQL_ASCII``. Typically, this can be changed to ``utf-8``, as a more useful default::
#client_encoding = sql_ascii # actually, defaults to database # encoding client_encoding = utf8
The ``client_encoding`` can be overridden for a session by executing the SQL:
SET CLIENT_ENCODING TO 'utf8';
SQLAlchemy will execute this SQL on all new connections based on the value passed to :func:`.create_engine` using the ``client_encoding`` parameter::
engine = create_engine( "postgresql+pg8000://user:pass@host/dbname", client_encoding='utf8')
.. _pg8000_isolation_level:
pg8000 Transaction Isolation Level -------------------------------------
The pg8000 dialect offers the same isolation level settings as that of the :ref:`psycopg2 <psycopg2_isolation_level>` dialect:
* ``READ COMMITTED`` * ``READ UNCOMMITTED`` * ``REPEATABLE READ`` * ``SERIALIZABLE`` * ``AUTOCOMMIT``
.. versionadded:: 0.9.5 support for AUTOCOMMIT isolation level when using pg8000.
.. seealso::
:ref:`postgresql_isolation_level`
:ref:`psycopg2_isolation_level`
""" from ... import util, exc import decimal from ... import processors from ... import types as sqltypes from .base import ( PGDialect, PGCompiler, PGIdentifierPreparer, PGExecutionContext, _DECIMAL_TYPES, _FLOAT_TYPES, _INT_TYPES) import re from sqlalchemy.dialects.postgresql.json import JSON
class _PGNumeric(sqltypes.Numeric): def result_processor(self, dialect, coltype): if self.asdecimal: if coltype in _FLOAT_TYPES: return processors.to_decimal_processor_factory( decimal.Decimal, self._effective_decimal_return_scale) elif coltype in _DECIMAL_TYPES or coltype in _INT_TYPES: # pg8000 returns Decimal natively for 1700 return None else: raise exc.InvalidRequestError( "Unknown PG numeric type: %d" % coltype) else: if coltype in _FLOAT_TYPES: # pg8000 returns float natively for 701 return None elif coltype in _DECIMAL_TYPES or coltype in _INT_TYPES: return processors.to_float else: raise exc.InvalidRequestError( "Unknown PG numeric type: %d" % coltype)
class _PGNumericNoBind(_PGNumeric): def bind_processor(self, dialect): return None
class _PGJSON(JSON):
def result_processor(self, dialect, coltype): if dialect._dbapi_version > (1, 10, 1): return None # Has native JSON else: return super(_PGJSON, self).result_processor(dialect, coltype)
class PGExecutionContext_pg8000(PGExecutionContext): pass
class PGCompiler_pg8000(PGCompiler): def visit_mod_binary(self, binary, operator, **kw): return self.process(binary.left, **kw) + " %% " + \ self.process(binary.right, **kw)
def post_process_text(self, text): if '%%' in text: util.warn("The SQLAlchemy postgresql dialect " "now automatically escapes '%' in text() " "expressions to '%%'.") return text.replace('%', '%%')
class PGIdentifierPreparer_pg8000(PGIdentifierPreparer): def _escape_identifier(self, value): value = value.replace(self.escape_quote, self.escape_to_quote) return value.replace('%', '%%')
class PGDialect_pg8000(PGDialect): driver = 'pg8000'
supports_unicode_statements = True
supports_unicode_binds = True
default_paramstyle = 'format' supports_sane_multi_rowcount = True execution_ctx_cls = PGExecutionContext_pg8000 statement_compiler = PGCompiler_pg8000 preparer = PGIdentifierPreparer_pg8000 description_encoding = 'use_encoding'
colspecs = util.update_copy( PGDialect.colspecs, { sqltypes.Numeric: _PGNumericNoBind, sqltypes.Float: _PGNumeric, JSON: _PGJSON, sqltypes.JSON: _PGJSON } )
def __init__(self, client_encoding=None, **kwargs): PGDialect.__init__(self, **kwargs) self.client_encoding = client_encoding
def initialize(self, connection): self.supports_sane_multi_rowcount = self._dbapi_version >= (1, 9, 14) super(PGDialect_pg8000, self).initialize(connection)
@util.memoized_property def _dbapi_version(self): if self.dbapi and hasattr(self.dbapi, '__version__'): return tuple( [ int(x) for x in re.findall( r'(\d+)(?:[-\.]?|$)', self.dbapi.__version__)]) else: return (99, 99, 99)
@classmethod def dbapi(cls): return __import__('pg8000')
def create_connect_args(self, url): opts = url.translate_connect_args(username='user') if 'port' in opts: opts['port'] = int(opts['port']) opts.update(url.query) return ([], opts)
def is_disconnect(self, e, connection, cursor): return "connection is closed" in str(e)
def set_isolation_level(self, connection, level): level = level.replace('_', ' ')
# adjust for ConnectionFairy possibly being present if hasattr(connection, 'connection'): connection = connection.connection
if level == 'AUTOCOMMIT': connection.autocommit = True elif level in self._isolation_lookup: connection.autocommit = False cursor = connection.cursor() cursor.execute( "SET SESSION CHARACTERISTICS AS TRANSACTION " "ISOLATION LEVEL %s" % level) cursor.execute("COMMIT") cursor.close() else: raise exc.ArgumentError( "Invalid value '%s' for isolation_level. " "Valid isolation levels for %s are %s or AUTOCOMMIT" % (level, self.name, ", ".join(self._isolation_lookup)) )
def set_client_encoding(self, connection, client_encoding): # adjust for ConnectionFairy possibly being present if hasattr(connection, 'connection'): connection = connection.connection
cursor = connection.cursor() cursor.execute("SET CLIENT_ENCODING TO '" + client_encoding + "'") cursor.execute("COMMIT") cursor.close()
def do_begin_twophase(self, connection, xid): connection.connection.tpc_begin((0, xid, ''))
def do_prepare_twophase(self, connection, xid): connection.connection.tpc_prepare()
def do_rollback_twophase( self, connection, xid, is_prepared=True, recover=False): connection.connection.tpc_rollback((0, xid, ''))
def do_commit_twophase( self, connection, xid, is_prepared=True, recover=False): connection.connection.tpc_commit((0, xid, ''))
def do_recover_twophase(self, connection): return [row[1] for row in connection.connection.tpc_recover()]
def on_connect(self): fns = [] if self.client_encoding is not None: def on_connect(conn): self.set_client_encoding(conn, self.client_encoding) fns.append(on_connect)
if self.isolation_level is not None: def on_connect(conn): self.set_isolation_level(conn, self.isolation_level) fns.append(on_connect)
if len(fns) > 0: def on_connect(conn): for fn in fns: fn(conn) return on_connect else: return None
dialect = PGDialect_pg8000
|