Python 提供了通过简单的 API 与数据库进行交互的支持。Python 附带的模块包括用于 SQLite 和 Berkeley DB 的模块。对于 MySQL、PostgreSQL、FirebirdSQL 等数据库,有第三方模块可供使用。这些第三方模块需要下载并安装后才能使用。例如,可以通过 Debian 包 "python-mysqldb" 安装 MySQLdb。

数据库管理系统(DBMS)具体示例

MySQL

使用 MySQL 的示例如下:

import MySQLdb
db = MySQLdb.connect("host machine", "dbuser", "password", "dbname")
cursor = db.cursor()
query = """SELECT * FROM sampletable"""
lines = cursor.execute(query)
data = cursor.fetchall()
db.close()

在第一行中,导入了 MySQLdb 模块。然后,设置与数据库的连接,并在第 4 行将要执行的实际 SQL 语句存储在变量 query 中。在第 5 行,我们执行查询,在第 6 行获取所有数据。执行完这段代码后,lines 包含了获取的行数(例如,sampletable 表中的行数)。data 变量包含所有实际的数据,例如 sampletable 的内容。最后,数据库连接将被关闭。如果行数较大,建议使用 row = cursor.fetchone() 并逐行处理数据:

# 前 5 行与上面相同
while True:
    row = cursor.fetchone()
    if row == None: break
    # 对此行数据进行处理
db.close()

显然,在处理每行数据时必须进行某种数据处理,否则数据将不会被存储。fetchone() 命令的结果是一个元组(Tuple)。

为了简化连接的初始化,可以使用配置文件

import MySQLdb
db = MySQLdb.connect(read_default_file="~/.my.cnf")
...

这里,~/.my.cnf 文件包含了 MySQL 所需的配置信息。

SQLite

SQLite 的示例如上面非常相似,游标(cursor)提供了许多相同的功能:

import sqlite3
db = sqlite3.connect("/path/to/file")
cursor = db.cursor()
query = """SELECT * FROM sampletable"""
lines = cursor.execute(query)
data = cursor.fetchall()
db.close()

在向数据库写入数据时,必须记得调用 db.commit(),否则更改不会被保存:

import sqlite3
db = sqlite3.connect("/path/to/file")
cursor = db.cursor()
query = """INSERT INTO sampletable (value1, value2) VALUES (1,'test')"""
cursor.execute(query)
db.commit()
db.close()

PostgreSQL

使用 PostgreSQL 的示例:

import psycopg2
conn = psycopg2.connect("dbname=test")
cursor = conn.cursor()
cursor.execute("select * from test")
for i in cursor.next():
    print(i)
conn.close()

Firebird

使用 Firebird 的示例:

import firebirdsql
conn = firebirdsql.connect(dsn='localhost/3050:/var/lib/firebird/2.5/test.fdb', user='alice', password='wonderland')
cur = conn.cursor()
cur.execute("select * from baz")
for c in cur.fetchall():
    print(c)
conn.close()

一般原则

参数引用

你将经常需要将动态数据替换到查询字符串中。确保这一步做得正确非常重要。

切勿这样做!

result = db.execute("SELECT name FROM employees WHERE location = '" + location + "'")

这个示例是错误的,因为它没有正确处理在字符串中替换时的特殊字符,比如单引号。如果你的代码必须处理潜在恶意用户(比如在面向公众的 Web 服务器上),这可能会使你容易受到 SQL 注入攻击。

对于简单情况,可以使用 execute 方法提供的自动参数替换,例如:

result = db.execute("SELECT name FROM employees WHERE location = ?", [location])

DBMS 接口本身会自动将你传递的值转换为正确的 SQL 语法。

对于更复杂的情况,DBMS 模块应该提供一个可以显式调用的引用函数。例如,MySQLdb 提供了 escape_string 方法,而 APSW(用于 SQLite3)提供了 format_sql_value。这是在查询结构需要更加动态的情况下使用的:

criteria = [("company", company)]  # 字段名和字段值的元组列表
if department != None:
    criteria.append(("department", department))
# ... 适当时附加其他可选条件 ...

result = db.execute(
        "SELECT name FROM employees WHERE "
    +
        " and ".join(
            "%s = %s" % (criterion[0], MySQLdb.escape_string(criterion[1]))
            for criterion in criteria
          )
  )

这会根据用户填写的字段动态构造查询,比如 “select name from employees where company = 'some company'” 或 “select name from employees where company = 'some company' and department = 'some department'”。

使用迭代器

Python 迭代器非常适合用于迭代大量数据库记录。以下是一个执行数据库查询并返回结果迭代器的函数示例,而不是一次性返回所有结果。它依赖于 APSW(Python 3 接口库)中 cursor.execute 方法本身返回一个迭代器。

def db_iter(db, cmd, mapfn = lambda x : x):
    "在数据库连接 db 上执行 cmd,并逐个返回结果"
    cu = db.cursor()
    result = cu.execute(cmd)
    while True:
        yield mapfn(next(result))

使用该函数的示例:

for artist, publisher in db_iter(
        db = db,
        cmd = "SELECT artist, publisher FROM artists WHERE location = %s" % apsw.format_sql_value(location)
):
    print(artist, publisher)

for location in db_iter(
        db = db,
        cmd = "SELECT DISTINCT location FROM artists",
        mapfn = lambda x : x[0]
):
    print(location)

在第一个示例中,由于 db_iter 返回一个元组,因此可以直接将其分配给记录字段的单独变量。在第二个示例中,元组只有一个元素,因此使用自定义的 mapfn 来提取该元素并返回,而不是返回整个元组。

永远不要在脚本中使用 "SELECT *"

数据库表结构经常会发生变化。随着应用需求的发展,字段甚至整个表可能会添加或删除。考虑以下语句:

result = db.execute("select * from employees")

你可能知道 employees 表当前包含 4 个字段。但明天可能会有人添加第五个字段。你记得更新代码来处理这个变化吗?如果没有,它很可能会崩溃。甚至更糟的是,可能会产生错误的结果!

最好总是列出你感兴趣的特定字段,不管字段有多少:

result = db.execute("select name, address, department, location from employees")

这样,任何添加的额外字段都会被忽略。如果删除了任何命名字段,代码至少会因运行时错误而失败,这会提醒你忘记更新代码。

在字段分隔时进行循环

考虑以下场景:你的销售公司数据库有一个员工表,还有一个每个员工销售记录的表。你想遍历这些销售记录,并为每个员工生成一些统计信息。一种简单的做法可能是:

  1. 查询数据库获取员工列表
  2. 对每个员工,执行查询获取该员工的销售记录

如果员工很多,那么第一次查询可能会产生一个大列表,第二步会涉及大量的数据库查询。

事实上,整个处理过程可以通过一个 SQL 查询来完成,使用标准 SQL 构造——连接(join)。

注意: SQL 编程是一个独立的专业技能。要了解更多,请从 Wikipedia 的文章开始。

以下是这种循环的示例:

rows = db_iter(
  db = db,
  cmd = "select employees.name, sales.amount, sales.date from"
        " employees left join sales on employees.id = sales.employee_id"
        " order by employees.name, sales.date"
)
prev_employee_name = None
while True:
    row = next(rows, None)
    if row != None:
        employee_name, amount, date = row
    if row == None or employee_name != prev_employee_name:
        if prev_employee_name != None:
            # 完成当前员工的统计
            report(prev_employee_name, employee_stats)
        if row == None:
            break
        # 开始新的员工统计
        prev_employee_name = employee_name
        employee_stats = {"total_sales" : 0, "number_of_sales" : 0}
        if date != None:
            employee_stats["earliest_sale"] = date
    # 该员工的另一个统计
    if amount != None:
        employee_stats["total_sales"] += amount
        employee_stats["number_of_sales"] += 1
    if date != None:
        employee_stats["latest_sale"] = date

这里的统计非常简单:最早和最晚的销售记录、销售次数和总销售额,也可以直接在 SQL 查询中计算。但相同的循环可以计算更复杂的统计信息(如标准差),这些是无法在简单的 SQL 查询中表示的。

请注意,每个员工的统计信息是在以下两种情况下写出的:

  1. 下一条记录的员工名字与当前记录不同
  2. 查询结果结束

这两种条件是通过 row == Noneemployee_name != prev_employee_name 来测试的;在写出员工统计信息后,使用单独的检查 row == None 来终止循环。如果循环没有终止,则为新员工初始化处理。

此外,还使用了左连接(left join):如果某个员工没有销售记录,连接将返回该员工的单行数据,销售表中的字段值为 SQL 的空值(在 Python 中表示为 None)。这就是为什么在处理这些字段之前需要检查 None 的原因。

我们还可以使用内连接(inner join),这将不返回没有销售记录的员工。是否要将这些员工从报告中排除,或者将它们包含并显示零的总数,取决于你的应用需求。

Last modified: Friday, 31 January 2025, 1:39 AM