Skip to main content

Twisted Klein: Database Usage

twisted.enterprise.adbapi

Twisted, and subsequently Klein, allows asynchronous interaction with databases using blocking/sequential interfaces using adbapi. The only caveat is that the database interface must be DBAPI 2.0 compliant. For instance, if a PostgreSQL database needs to be accessed, then a typical interaction would be something like:

import psycopg2

connection = psycopg2.connect(database='Tutorial', user='admin', host='10.10.10.10')
cursor = connection.cursor()

cursor.execute("CREATE TABLE test (id serial PRIMARY KEY, num integer, data varchar);")
cursor.execute("INSERT INTO test (num, data) VALUES (%s, %s)", (100, "mydata"))
cursor.execute("SELECT * FROM test;")
connection.commit()
cursor.close()
connection.close()

Depending on environment settings, connecting to the database server, inserting data, or even querying for records can take an unpredictable amount of time, which can cause your application to slow down considerably. These unknown variables can be mitigated by using asynchronous functionality in the adbapi module. The asynchronous method is similar to the sequential code above, albeit using Deferreds and callbacks:

from twisted.internet import defer, reactor
from twisted.enterprise.adbapi import ConnectionPool

def createDB(cursor):
    cursor.execute("CREATE TABLE test (id serial PRIMARY KEY, num integer, data varchar);")

def insert(cursor, num, data):
    cursor.execute("INSERT INTO test (num, data) VALUES (%s, %s)", (num, data))

@defer.inlineCallbacks
def main():
    connection = ConnectionPool('psycopg2', database='Tutorial', user='admin', host='10.10.10.10')
    yield connection.runInteraction(createDB)
    yield connection.runInteraction(insert, 100, 'mydata')
    results = yield connection.runQuery('SELECT * FROM test')
    print(results)

main()
reactor.run()


ConnectionPool manages database connections. It takes the name of the database module as the first parameter, followed by all arguments that would get passed into the connect() function. Then when the database connection is required, it will be created in a separate thread so that the main thread isn’t being blocked. So for the examples above, psycopg2.connect(database, user, host) is the equivalent to ConnectionPool('psycopg2', database, user, host). After running each database interaction, the transaction will be committed, therefore explicit commit() calls aren’t required.

Other Modules

The base adbapi module is great for quick results, but many desire a bit more functionality. Luckily there are many database modules which support Twisted. This includes standard relational databases, as well as popular NoSQL options. There’s even a good ORM available! Perhaps in a future post, I’ll elaborate on specific modules.

Combine adbapi with Klein

WARNING! SQL injection is REAL and it can happen to you! The following example isn’t safe to use in production without proper security precautions, it’s just for demonstration purposes. Remember to sanitize user input, limit access to vital db functionality, and use stored procedures if possible. Be safe!
Let’s create an object that houses core database functionality required for a simple web app. The database of choice will be sqlite3 with a table called People.

from klein import Klein
from twisted.enterprise import adbapi

class Database(object):

    dbpool = adbapi.ConnectionPool('sqlite3', 'AsyncDB.sqlite', check_same_thread=False)
    table = 'People'


    def _createDB(self, cursor):
        create_stmt = 'CREATE TABLE %s (' \
            '_id_ INTEGER PRIMARY KEY,' \
            'first_name TEXT,' \
            'last_name TEXT,' \
            'age INTEGER' \
            ')' % (self.table)
        cursor.execute(create_stmt)

    def createDB(self):
        return self.dbpool.runInteraction(self._createDB)

    def _insert(self, cursor, first, last, age):
        insert_stmt = 'INSERT INTO %s (first_name, last_name, age) VALUES ("%s", "%s", %d)' % (self.table, first, last, age)
        cursor.execute(insert_stmt)

    def insert(self, first, last, age):
        return self.dbpool.runInteraction(self._insert, first, last, age)

    def queryAll(self):
        select_stmt = 'SELECT * FROM %s' % (self.table)
        return self.dbpool.runQuery(select_stmt)

Next, let’s create an object that holds the routes /create, /insert, /query along with the Database class created previously.

import json
from klein import Klein
from twisted.enterprise import adbapi
from twisted.internet import reactor

class WebApp(object):

    app = Klein()
    db = Database()

    #--------- Routes ---------#
    @app.route('/create')
    def createDB(self, request):
        d = self.db.createDB()
        d.addCallback(self.onSuccess, request, 'Successfully created db')
        d.addErrback(self.onFail, request, 'Failed to create db')
        return d

    @app.route('/insert', methods=['POST'])
    def insert(self, request):
        first_name = request.args.get('fname', [None])[0]
        last_name = request.args.get('lname', [None])[0]
        age = int(request.args.get('age', [0])[0])

        d = self.db.insert(first_name, last_name, age)
        d.addCallback(self.onSuccess, request, 'Insert success')
        d.addErrback(self.onFail, request, 'Insert failed')
        return d

    @app.route('/query', methods=['GET'])
    def queryAll(self, request):
        d = self.db.queryAll()
        d.addCallback(self.toJSON, request)
        d.addErrback(self.onFail, request, 'Failed to query db')
        return d


    #---------- Callbacks -----------#
    def onSuccess(self, result, request, msg):
        request.setResponseCode(201)
        response = {'message': msg}
        return json.dumps(response)

    def onFail(self, failure, request, msg):
        request.setResponseCode(417)
        response = {'message': msg}
        return json.dumps(response)

    def toJSON(self, results, request):
        request.setHeader('Content-Type', 'application/json')
        responseJSON = []
        for record in results:
            mapper = {}
            mapper['id'] = record[0]
            mapper['first_name'] = record[1].encode('utf-8')
            mapper['last_name'] = record[2].encode('utf-8')
            mapper['age'] = record[3]
            responseJSON.append(mapper)
        return json.dumps(responseJSON)

if __name__ == '__main__':
    webapp = WebApp()
    webapp.app.run('localhost', 9000)

The /create endpoint needs to be accessed first so that a database can be created. A person’s first name, last name, and age need to be passed in as form data to the /insert endpoint. Finally the results can be queried and represented in JSON from the /query endpoint.

curl -v localhost:9000/create
curl -v -X POST -d fname=Tom\&lname=Brady\&age=39 localhost:9000/insert
curl -X GET localhost:9000/query | python -m json.tool | less

References

Comments

Post a Comment

Popular posts from this blog

Twisted Klein: Non-Blocking Recipes

Non-Blocking Recipes Do you like expressjs , but don’t want to switch to Node.js? Want non-blocking execution in Python? Then look no further! Asynchronous execution is the very essence of what makes Klein a contender in todays web framework landscape. It’s also the most difficult concept to grasp since most Pythonistas are not accustomed to asynchronous programming. Hopefully with the addition asyncio to Python’s standard library, this will change. Klein is built atop Twisted and developers can expose Deferred objects for asynchronous behavior. A very brief overview will be given on Twisted Deferred , afterwards aspiring developers are encouraged to read the Twisted documents on the subject (provided in the links near the bottom). Deferred Overview To demonstrate how Deferred objects work, we will create a chain of callback functions that execute after a result is available. Don’t worry if this confuses you right now, the code will clear things up. from __future_

Simple self signed TLS server and client using Twisted

Self signed TLS server and client using Twisted Prerequisites openssl twisted & pyOpenSSL modules- pip install twisted[tls] treq module - pip install treq Basic knowledge of Twisted TCP servers/clients Generate self signed certificate Generate the server's private key using a secret, which is SuperSecretPassword in this case. openssl genrsa -aes256 -passout pass:SuperSecretPassword -out server.key 2048 Perform a CSR (certificate signing request). Ensure the FQDN (fully qualified domain name) matches the hostname of the server, otherwise the server won't be properly validated. openssl req -new -key server.key -passin pass:SuperSecretPassword -out server.csr # Common Name (e.g. server FQDN or YOUR name) []:localhost openssl x509 -req -passin pass:SuperSecretPassword -days 1024 -in server.csr -signkey server.key -out server.crt For development purposes, remove the password from the certificate. openssl rsa -in server.key -out server_no_pass.key -passin pas

Python alias commands that play nice with virtualenv

There are plenty of predefined Python executables, symlinks, and aliases that come bundled with your operating system. These commands come in very handy because it saves you from typing out long commands or chain of scripts. However the downfall of operating system aliases is that they don’t always play nice with virtualenv (or venv if you’re on Python 3). Most predefined aliases use the system’s default Python as the interpreter, which is next to useless when your application runs in a virtual environment. Over the years, I’ve come up with my own Python aliases that play nice with virtual environments. For this post, I tried to stay as generic as possible such that any alias here can be used by every Pythonista. In other words, there will be no aliases for specific frameworks such as running a Django server or starting a Scrappy spider. The following is one of my bash scripts I source: .py-aliases #----- Pip -----# alias pip-list="pip freeze | less" alias pip-search