Python函数式编程
🐍 Python中的函数式编程
“函数式编程为创建代码简洁明了的软件提供了许多技术。虽然Python 不是纯粹的函数式语言,但仍然可以使用Python 进行函数式编程。”
函数式编程概述
编程范式
编程范式是指编程的基本风格和方法。函数式编程是其中的一种,它强调使用函数来处理数据。
函数式编程的特点
头等函数:函数可以作为参数传递,并且可以被返回。
不可变数据结构:数据一旦创建便不可改变,减少了副作用。
严格求值与非严格求值:控制表达式求值的时机,影响程序性能。
用递归代替循环语句:通过递归来实现重复操作,减少可变状态。
函数类型系统:支持在类型系统中使用函数。
经典示例
使用函数式编程可以简化算法实现,例如在数据分析中广泛使用的函数式工具。
Python中的函数
头等对象
在Python中,函数被视为头等对象,可以赋值给变量、作为参数传递以及作为返回值。
纯函数与高阶函数
纯函数:给定相同输入,总是返回相同输出,不产生副作用。
高阶函数:接受函数作为参数或返回函数的函数。
生成器与迭代器
生成器表达式:允许在内存中逐个生成数据,避免创建大型数据结构,降低资源消耗,提高执行速度。
迭代器:提供了一种访问集合元素的方法,而无需暴露集合的内部结构。
使用集合
内置函数
Python提供了内置函数如any()和all(),可以将集合转换为单个值。
示例
函数 描述
any() 如果集合中至少有一个元素为真,则返回True
all() 如果集合中的所有元素都为真,则返回True
高阶函数
常用高阶函数
map()函数:将函数应用于每个元素。
filter()函数:根据条件过滤元素。
示例代码

使用map()和filter()的示例

numbers = [1, 2, 3, 4, 5]
squared = list(map(lambda x: x**2, numbers))
evens = list(filter(lambda x: x % 2 == 0, numbers))
递归与归约
递归:使用函数自身来解决问题。
归约:将集合中的元素组合成单个值。
示例

计算列表元素的和

def sum_list(lst):
if not lst:
return 0
return lst[0] + sum_list(lst[1:])
总结
函数式编程在Python中提供了一种清晰优雅的编程方式,能够提高代码的可读性和维护性。通过理解函数、迭代器、生成器及高阶函数的使用,开发者可以更有效地利用Python进行编程。
📊 编程范式
函数式编程概述
“函数式编程通过在函数中定义表达式和对表达式求值完成计算。它尽量避免由于状态变化和使用可变对象引入复杂性,让程序变得简洁明了。”
编程范式
编程范式并没有统一的划分标准,本书重点分析其中两个范式:函数式编程和命令式编程。
特征 函数式编程 命令式编程
状态管理 避免状态变化,使用函数求值 通过变量值反映计算状态
语句特性 函数的组合,易于理解 通过命令改变状态,复杂的流程控制
理想状态
在理想状态下,每一条语句通过改变状态,推动计算从初始状态向期望的最终结果不断靠近。需要首先定义出最终状态,并找到能达到该状态的语句。
函数式语言的优点
使用“对函数求值”代替改变变量值的“状态”。
创建新对象而非修改现有对象。
提高可视化程度,便于编写单元测试用例。
细分过程范式
命令式编程可以再细分为多个子类别,本节简单介绍过程式编程和面向对象编程的区别。
过程式编程与面向对象编程
过程式编程:侧重于过程或算法的执行。
面向对象编程:侧重于对象的定义和操作。
特征 过程式编程 面向对象编程
结构 通过函数和过程 通过对象和类
数据管理 通过全局变量 数据封装在对象内部
示例代码
s = 0
for n in range(1, 10):
if n % 3 == 0:
s += n
该示例展示了如何使用循环结构对一组属性相同的数进行处理。
🐍 Python 的过程式与面向对象编程

  1. 程序状态与变量
    在 Python 中,程序的状态通常由变量定义,例如变量 s 和 n。变量 n 的取值范围是从 1 到 10,每次循环中 n 的值依次增加,循环在 n == 10 时结束。下面是一个示例程序,计算 1 到 10 中 3 或 5 的倍数之和:
    s = 0
    for n in range(1, 11):
    if n % 3 == 0 or n % 5 == 0:
    s += n
    print(s)
  2. 面向对象编程示例
    在面向对象编程中,我们可以使用列表对象来存储状态,下面是一个使用 list 的示例程序:
    m = list()
    for n in range(1, 10):
    if n % 3 == 0 or n % 5 == 0:
    m.append(n)
    print(sum(m))
    注意:这个程序的结果与前面的结果相同,但它内部维护了一个状态可变的集合对象 m。
  3. Python 的面向对象特性
    Python 是一种纯粹的面向对象语言,没有原始数据类型,所有数据类型都是对象。以下是一个自定义子类的示例,用于实现求和功能:
    class Summable_List(list):
    def sum(self):
    s = 0
    for v in self:
    s += v
    return s
    使用 Summable_List 初始化变量 m,可以用 m.sum() 方法替代 sum(m) 方法来对 m 求和。
  4. 函数式编程概述
    在函数式编程中,求 3 或 5 的倍数的过程可分为两部分:
    对一系列数值求和。
    生成一个满足某个条件的序列。
    4.1 递归求和
    递归求和的定义如下:
    def sumr(seq):
    if len(seq) == 0:
    return 0
    return seq[0] + sumr(seq[1:])
    4.2 生成符合条件的序列
    使用以下函数生成 3 或 5 的倍数:
    def until(n, filter_func, v):
    if v == n:
    return []
    if filter_func(v):
    return [v] + until(n, filter_func, v + 1)
    else:
    return until(n, filter_func, v + 1)
    4.3 使用 lambda
    定义一个用于筛选数值的 lambda 对象:
    mult_3_5 = lambda x: x % 3 == 0 or x % 5 == 0
    结合 until 函数,可以生成一系列 3 或 5 的倍数:
    until(10, lambda x: x % 3 == 0 or x % 5 == 0, 0)
    4.4 使用生成器表达式
    可以使用生成器表达式实现混合型函数:
    print(sum(n for n in range(1, 10) if n % 3 == 0 or n % 5 == 0))
  5. 对象创建过程
    在 Python 中,创建对象的过程可以通过函数和类来实现。在编程中,计算路径并不是唯一的,可能会有多种计算方式得到相同的结果。
    5.1 计算示例
    例如,使用不同的计算路径:
    1 + 2 + 3 + 4 # 结果为 10
    ((1 + 2) + 3) + 4 # 结果为 10
    1 + (2 + (3 + 4)) # 结果为 10
  6. 函数式编程的经典示例
    使用 Newton-Raphson 算法求解平方根函数的示例:
    def next_(n, x):
    return (x + n / x) / 2
    示例代码生成一系列近似值并使用 yield 生成:
    def repeat(f, a):
    yield a
    for v in repeat(f, f(a)):
    yield v
    结合这些函数,可以创建求平方根的函数:
    def sqrt(a0, ε, n):
    return within(ε, repeat(lambda x: next_(n, x), a0))
  7. 小结
    本章介绍了 Python 的编程范式,包括过程式、面向对象和函数式编程,并分析了各自的特点和应用场景。Python 作为一种多范式编程语言,灵活地结合了这三种编程风格,适用于不同的编程需求。
    🧠 函数式编程的特点
  8. 函数对象的属性
    在Python中,函数是头等对象,可以像其他对象一样处理。以下是一个函数对象的示例:

example.code.co_varnames
(‘a’, ‘b’, ‘kw’)
example.code.co_argcount
2
__code__对象包含函数的属性,如变量名和参数数量。

  1. 纯函数
    纯函数是指没有副作用的函数,其优势包括:
    可以通过改变求值顺序实现优化
    概念简单,便于测试
    编写纯函数的要求
    作用域为本地
    避免使用global和nonlocal语句
    示例:使用lambda表达式
    以下是一个使用lambda表达式创建的纯函数示例:

mersenne = lambda x: 2 ** x - 1
mersenne(17)
131071

  1. 高阶函数
    高阶函数是指以其他函数为参数或返回值的函数。使用高阶函数可以使程序更加简洁明了。
    示例:使用max()函数
    以下示例演示如何使用max()函数与lambda表达式结合:
    year_cheese = [(2000, 29.87), (2001, 30.12), (2002, 30.6), (2003, 30.66), (2004, 31.33), (2005, 32.62), (2006, 32.73), (2007, 33.5), (2008, 32.84), (2009, 33.02), (2010, 32.92)]

max(year_cheese)
(2010, 32.92)
max(year_cheese, key=lambda yc: yc[1])
(2007, 33.5)

  1. 不可变数据结构
    在函数式编程中,使用不可变对象(如元组)构建复杂的数据结构是常见做法。
    性能优势
    不可变对象的性能通常优于可变对象。
    使用不可变对象时,算法设计需考虑避免改变对象的开销。
    设计模式:打包-处理-拆包
    以下是处理不可变对象的设计模式示例:

max(map(lambda yc: (yc[1], yc), year_cheese))[1]
(2007, 33.5)

  1. 严格求值与非严格求值
    惰性求值
    Python的逻辑运算符and、or和if-then-else是非严格的。以下是示例:

0 and print(“right”)
0
True and print(“right”)
right
生成器表达式
生成器函数支持惰性求值,示例:
def numbers():
for i in range(1024):
print(f"= {i}")
yield i

  1. 用递归代替循环语句
    函数式编程通常使用递归而非循环。以下是判断质数的递归示例:
    def isprimer(n: int) -> bool:
    def isprime(k: int, coprime: int) -> bool:
    “”“Is k relatively prime to the value coprime?”“”
    if k < coprime * coprime: return True
    if k % coprime == 0: return False
    return isprime(k, coprime + 2)
    if n < 2: return False
    if n == 2: return True
    if n % 2 == 0: return False
    return isprime(n, 3)
  2. 函数类型系统
    Python 3引入了类型提示功能,以提高代码质量。示例:
    def sum_to(n: int) -> int:
    sum: int = 0
    for i in range(n):
    sum += i
    return sum
  3. 小结
    本章讨论了函数式编程的核心特征,包括头等对象、高阶函数、纯函数、不可变数据结构、严格求值与非严格求值以及递归代替循环的理念。这些特征在Python编程中得到了广泛的应用。
    🐍 Python 中的函数式编程
  4. 不可变数据结构与状态变化
    在Python等面向对象的语言中,不可变数据结构显得有些另类,但从函数式编程的角度来看,状态变化是许多问题的根源。使用不可变数据结构有助于我们直击问题的核心。
  5. 求值策略
    Python使用严格求值策略,即按照从左向右的顺序对语句中的表达式求值。然而,在处理逻辑运算符(如or、and和if-else)时,Python采用非严格求值(又称“积极求值”和“惰性求值”)。
    积极求值:对所有部分进行求值。
    惰性求值:仅对已知部分求值。
    生成器函数也不是严格求值的,Python通过生成器函数实现惰性求值。
  6. 递归与循环
    函数式语言通常使用递归代替循环语句,但Python对递归有一些限制,如调用栈长度限制和缺乏优化编译器。我们需要手动优化递归函数。
  7. 动态类型系统
    Python使用动态类型系统,虽然有时需要手动转换类型或创建专门的类来应对复杂情形,但大多数情况下,利用Python现有的规则能够很好地解决问题。
  8. 纯函数
    纯函数是函数式编程的核心,没有副作用,返回结果仅与参数有关。例如:
    def m(n: int) -> int:
    return 2 ** n - 1
    5.1 避免副作用
    避免副作用意味着减少对变量赋值的依赖。通过将全局变量的引用用参数替代,可以使函数更纯。
  9. 函数作为第一类对象
    Python中的函数是第一类对象,可以赋值给变量、作为参数传递或作为返回值。以下是一个示例:
    from typing import Callable

class Mersenne1:
def init(self, algorithm: Callable[[int], int]) -> None:
self.pow2 = algorithm

def __call__(self, arg: int) -> int:
    return self.pow2(arg) - 1
  1. 使用元组和命名元组
    Python的元组也是不可变对象,适合函数式编程。命名元组允许通过属性名称引用属性,增加了可读性。
    from collections import namedtuple

Color = namedtuple(“Color”, (“red”, “green”, “blue”, “name”))
8. 生成器与生成器表达式
生成器和生成器表达式是处理集合对象的重要工具,支持惰性求值。生成器表达式示例:
def pfactorsl(x: int):
if x % 2 == 0:
yield 2
if x // 2 > 1:
yield from pfactorsl(x // 2)
return
for i in range(3, int(math.sqrt(x) + .5) + 1, 2):
if x % i == 0:
yield i
if x // i > 1:
yield from pfactorsl(x // i)
return
yield x
9. 生成器的局限性
生成器表达式和生成器函数存在局限性,例如,生成器对象没有长度:
len(pfactorsl(1560)) # 将引发 TypeError
10. 使用字符串
Python的字符串是不可变的,适合函数式编程。字符串方法通常是纯函数,以下是字符串清洗的示例:
from decimal import *

def clean_decimal(text: str) -> Optional[str]:
if text is None: return None
return Decimal(text.replace(“$”, “”).replace(“,”, “”))
使用函数式编程的方法,可以使代码更简洁和一致。
🐍 生成器函数的特性与应用
生成器的惰性特性
“生成器是惰性的,在外部对其求值之前,并不进行实际的计算。”
生成器的局限性
长度限制:生成器不能使用 len() 函数来获取其长度,因为在所有值都取完之前,无法知道值的总数。
单次使用:生成器只能使用一次。例如:
result = pfactorsl(1560)
sum(result) # 第一次使用
sum(result) # 第二次使用会返回0,因为生成器为空
itertools.tee() 函数
生成器函数的单次使用限制可以通过 itertools.tee() 函数来弥补。此函数创建生成器的副本,便于多次使用。
示例代码
import itertools
from typing import Iterable, Any

def limits(iterable: Iterable[Any]) -> Any:
max_tee, min_tee = itertools.tee(iterable, 2)
return max(max_tee), min(min_tee)
数据清洗与生成器函数
在数据清洗过程中,常常需要将输入数据转换为可用的数据集合。以下是一个示例数据集:安斯库姆四重奏(Anscombe’s quartet)。
示例数据集
I II III IV
x y x y
10.0 8.04 10.0 9.14
8.0 6.95 8.0 8.14
13.0 7.58 13.0 8.74
使用 csv 模块处理数据
import csv
from typing import IO, Iterator, List, Text, Union, Iterable

def row_iter(source: IO) -> Iterator[List[Text]]:
return csv.reader(source, delimiter=“\t”)
过滤非数据行
为了清洗数据,需要移除非数据的行,例如数据集的标题行。
移除标题行的函数
def head_split_fixed(row_iter: Iterator[List[Text]]) -> Iterator[List[Text]]:
title = next(row_iter)
assert (len(title) == 1 and title[0] == “Anscombe’s quartet”)
heading = next(row_iter)
assert (len(heading) == 4 and heading == [‘I’, ‘II’, ‘III’, ‘IV’])
columns = next(row_iter)
assert (len(columns) == 8 and columns == [‘x’,‘y’, ‘x’,‘y’, ‘x’,‘y’, ‘x’,‘y’])
return row_iter
组合生成器表达式
生成器表达式可以通过创建复合函数来组合,从而描述复杂的处理流程。
组合示例
使用生成器表达式进行组合:
g_f_x = (g(f(x)) for x in range())
嵌套生成器表达式:
g_f_x = (g(y) for y in (f(x) for x in range()))
利用变量复用
f_x = (f(x) for x in range())
g_f_x = (g(y) for y in f_x)
统计数据处理
使用高阶函数(如 map() 和 filter())进行数据转换和抽取可以提高代码的简洁性。
统计示例
计算 y 的平均值的代码示例:
mean = (
sum(float(pair[1]) for pair in sample_I) / len(sample_I)
)
创建命名元组
命名元组可以提高代码的可读性。例如:
from collections import namedtuple
Color = namedtuple(“Color”, (“red”, “green”, “blue”, “name”))
总结
生成器函数和表达式在函数式编程中具有重要的应用,尤其是在数据处理和清洗过程中。通过组合生成器表达式,可以实现复杂的数据处理逻辑,同时保持代码的简洁性。
📚 使用集合
3.6.3 使用有状态的集合
Python 提供了多种有状态的集合,包括 set 集合。set 有两个主要的使用场景: - 用有状态的 set 收集数据 - 用不可变的 frozenset 优化数据搜索算法
frozenset 的创建
可以通过可迭代对象创建 frozenset,也可以使用 frozenset(some_iterable) 方法。
小结
本章重点介绍了如何编写没有副作用的纯函数。实现这一点并不难,因为 Python 要求在写不纯粹的函数时必须使用 global 语句。然后探讨了生成器函数,以及如何以其为基础进行函数式编程。还介绍了 Python 内置的集合类,以及如何在函数范式中使用它们。函数式编程整体上尽量避免使用有状态的变量,然而集合数据类型往往是有状态的,而且许多算法正是利用了集合有状态的特性,因此在使用 Python 的非函数特性时,务必考虑清楚。
接下来的两章主要讨论高阶函数:以函数为参数,或者返回另一个函数的函数。首先会介绍 Python 内置的高阶函数,然后会介绍自定义高阶函数的相关技术,以及 itertools 模块和 functools 模块中的高阶函数。
🛠️ 使用集合的函数概览
Python 提供了很多能处理集合的函数,可以用于序列(列表或元组)、set、映射,以及生成器表达式返回的可迭代对象。本章将从函数式编程的视角研究 Python 的集合处理函数。
函数分类
函数大体可分为以下两类: - 标量函数:作用于单个值,并返回单个值,例如 abs()、pow() 以及整个 math 模块中的函数。 - 集合函数:作用于可迭代集合。
集合函数又可以细分为以下三类: - 归约函数:通过指定函数将集合内元素汇聚在一起,生成单个值作为结果。 - 映射函数:将标量函数作用于集合的每个元素,作为结果返回的集合与输入集合长度相同。 - 过滤函数:将标量函数作用于集合的每个元素,保留其中一部分元素,舍弃另一部分元素。
4.2 使用可迭代对象
在 Python 中处理集合时,经常使用 for 循环语句。当处理实例化的集合数据(例如元组、列表、映射与 set)时,for 循环中包含了显式的状态管理。虽然这种用法不符合函数式编程的原则,但反映出它是 Python 的一种必要的优化手段。
在函数式语境下,for 循环迭代处理常用于 unwrap(process(wrap(iterable))) 设计模式。wrap() 函数将原始可迭代对象中的每个元素转换为一个二元组。
示例:提取元素
定义以下两个函数来从元组中提取第一个和第二个元素:
fst = lambda x: x[0]
snd = lambda x: x[1]
解析 XML 文件
通过解析一个 XML(可扩展标记语言)文件获得原始经纬度数据。该过程将展示封装 Python 中不那么函数式的一些语言特征,来生成一组可迭代序列的方法。
import xml.etree.ElementTree as XML
from typing import Text, List, TextIO, Iterable

def row_iter_kml(file_obj: TextIO) -> Iterable[List[Text]]:
ns_map = {
“ns0”: “http://www.opengis.net/kml/2.2”,
“ns1”: “http://www.google.com/kml/ext/2.2”}
path_to_points= (“./ns0:Document/ns0:Folder/ns0:Placemark/”
“ns0:Point/ns0:coordinates”)
doc = XML.parse(file_obj)
return (comma_split(Text(coordinates.text))
for coordinates in
doc.findall(path_to_points, ns_map))
数据处理
解析后生成的是一组字符串元组,与 CSV 解析器生成的结果格式兼容,之后从 SQL 数据库中获取数据时,返回的结果也是类似的,这样就保证了上层处理函数能以相同的方法解析不同来源的数据。
组对序列元素
将点序列重组为“开始-结束”对组成的序列,是常用的数据处理步骤。
from typing import Iterator, Any
Item_Iter = Iterator[Any]
Pairs_Iter = Iterator[Tuple[float, float]]

def pairs(iterator: Item_Iter) -> Pairs_Iter:
def pair_from(head: Any, iterable_tail: Item_Iter) -> Pairs_Iter:
nxt = next(iterable_tail)
yield head, nxt
yield from pair_from(nxt, iterable_tail)

try:  
    return pair_from(next(iterator), iterator) 
except StopIteration: 
    return iter([]) 

此函数通过递归的方式将元素成对返回,确保了代码的复用性和可维护性。
🌀 循环优化与可迭代对象

  1. 函数定义与实现
    1.1 legs 函数
    legs 函数用于沿路径依次组对的实现,处理速度快且不受栈长度限制。
    from typing import Iterator, Any, Iterable, TypeVar
    T_ = TypeVar(‘T_’)
    Pairs_Iter = Iterator[Tuple[T_, T_]]

def legs(lat_lon_iter: Iterator[T_]) -> Pairs_Iter:
begin = next(lat_lon_iter)
for end in lat_lon_iter:
yield begin, end
begin = end
功能描述: 该函数生成的对序列为 list[0:1], list[1:2], list[2:3], …, list[-2:]。
适用性: 可以处理任何类型的序列,适用于各种可迭代对象。
1.2 类型变量
使用 TypeVar 定义的类型变量 T_ 用于描述输入输出类型一致性。
输入类型为某种类型 T_ 组成的迭代器,输出为由相同类型元素构成的元组。
2. 显式使用 iter() 函数
2.1 迭代器与可迭代对象
迭代器: 是一种用于遍历数据的对象,能够维护状态。
使用场景: 当使用 for 循环处理序列时,列表对象和可迭代对象的用法相同,但在某些情况下,如使用 next() 时,可能会出现不同。
2.2 next() 和 iter() 的示例

list(legs(x for x in range(3)))
[(0, 1), (1, 2)]
list(legs([0,1,2]))
Traceback (most recent call last):
TypeError: ‘list’ object is not an iterator
list(legs(iter([0,1,2])))
[(0, 1), (1, 2)]
示例分析:
第一个示例中,使用生成器表达式正确生成了路径段。
第二个示例中,直接传入列表导致错误,因为列表不是迭代器。
第三个示例中,通过 iter() 将列表转换为迭代器,成功生成路径段。

  1. 扩展简单循环
    3.1 过滤式扩展
    实现方案: 通过添加筛选规则,过滤掉某些数据。
    from typing import Iterator, Any, Iterable

Pairs_Iter = Iterator[Tuple[float, float]]
LL_Iter = Iterable[Tuple[Tuple[float, float], Tuple[float, float]]]

def legs_filter(lat_lon_iter: Pairs_Iter) -> LL_Iter:
begin = next(lat_lon_iter)
for end in lat_lon_iter:
if #some rule for rejecting:
continue
yield begin, end
begin = end
3.2 数据映射扩展
示例: 将字符串转换为浮点数。
trip = list(
legs(
(float(lat), float(lon))
for lat, lon in lat_lon_kml(row_iter_kml(source))
)
)
3.3 函数重构
float_from_pair 函数: 将数据转换过程参数化,以便复用。
from typing import Iterator, Tuple, Text, Iterable

Text_Iter = Iterable[Tuple[Text, Text]]
LL_Iter = Iterable[Tuple[float, float]]

def float_from_pair(lat_lon_iter: Text_Iter) -> LL_Iter:
return (
(float(lat), float(lon))
for lat, lon in lat_lon_iter
)
4. 归约与统计分析
4.1 使用 any() 和 all() 函数
功能: any() 和 all() 用于布尔归约,返回单个值(True 或 False)。
all(isprime(x) for x in SomeSet) # 所有元素为质数
any(not isprime(x) for x in someset) # 存在非质数元素
4.2 统计函数
len() 和 sum(): 提供简单的归约方法,计算序列中值的个数和汇总值。
def mean(items: Sequence) -> float:
return sum(items) / len(items)
示例: 计算算术平均值。
5. 代码优化实例
使用 float_from_pair 函数简化复杂表达式。
legs(
float_from_pair(
lat_lon_kml(row_iter_kml(source))
)
)
通过函数替换规则,简化了多层嵌套表达式。
6. 小结
通过上述方法,优化了循环、数据处理及归约操作,确保代码简洁且高效。
📊 统计函数与相关性计算
统计函数概述
在统计学中,使用一些基本的求和函数(如 s_0,s_1,s_2)来计算平均值和标准差。以下是这三个求和函数的定义:
def s0(samples: Sequence) -> float:
return sum(1 for x in samples) # 或者使用 len(data)

def s1(samples: Sequence) -> float:
return sum(x for x in samples) # 或者使用 sum(data)

def s2(samples: Sequence) -> float:
return sum(x * x for x in samples)
计算平均值与标准差
平均值和标准差的计算过程相对简单,标准差的计算稍复杂,但仍然可行。以下是计算平均值和标准差的函数实现:
def mean(samples: Sequence) -> float:
return s1(samples) / s0(samples)

def stdev(samples: Sequence) -> float:
N = s0(samples)
return sqrt((s2(samples) / N) - (s1(samples) / N) ** 2)
标准化计算
标准化是将样本值 x 转换为标准值 z 的过程,公式如下:
z(x,μ_x,σ_x)=(x-μ_x)/σ_x
其中 μ_x 是样本的平均值,σ_x 是样本的标准差。标准化后,约 2/3 的样本值会落在 ±1σ 范围内,超过 ±3σ 的概率小于 1/100。
示例代码
对样本 d=[2,4,4,4,5,5,7,9] 进行标准化:

d = [2, 4, 4, 4, 5, 5, 7, 9]
list(z(x, mean(d), stdev(d)) for x in d)
[-1.5, -0.5, -0.5, -0.5, 0.0, 0.0, 1.0, 2.0]
相关性计算
计算两个样本集之间的相关性可以通过标准化后的值进行。以下是相关性计算的函数实现:
def corr(samples1: Sequence, samples2: Sequence) -> float:
m_1, s_1 = mean(samples1), stdev(samples1)
m_2, s_2 = mean(samples2), stdev(samples2)
z_1 = (z(x, m_1, s_1) for x in samples1)
z_2 = (z(x, m_2, s_2) for x in samples2)
r = (sum(zx1 * zx2 for zx1, zx2 in zip(z_1, z_2)) / len(samples1))
return r
示例代码
计算身高和体重之间的相关性:

身高 (m)

xi = [1.47, 1.50, 1.52, 1.55, 1.57, 1.60, 1.63, 1.65,
… 1.68, 1.70, 1.73, 1.75, 1.78, 1.80, 1.83,]

体重 (kg)

yi = [52.21, 53.12, 54.48, 55.84, 57.20, 58.57, 59.93, 61.29,
… 63.11, 64.47, 66.28, 68.10, 69.92, 72.19, 74.46,]
round(corr(xi, yi), 5)
0.99458
使用 zip() 函数
zip() 函数用于将来自多个可迭代对象的数据交叉组合在一起。其基本用法如下:
list(zip(xi, yi))
[(1.47, 52.21), (1.5, 53.12), (1.52, 54.48), (1.55, 55.84),
(1.57, 57.2), (1.6, 58.57), (1.63, 59.93), (1.65, 61.29),
(1.68, 63.11), (1.7, 64.47), (1.73, 66.28), (1.75, 68.1),
(1.78, 69.92), (1.8, 72.19), (1.83, 74.46)]
特殊情况处理
没有输入参数:
zip()
<zip object at 0x101d62ab8>
list()
[]
只有一个输入参数:
zip((1, 2, 3))
<zip object at 0x101d62ab8>
list(
)
[(1,), (2,), (3,)]
输入序列长度不一致:
list(zip((1, 2, 3), (‘a’, ‘b’)))
[(1, ‘a’), (2, ‘b’)]
平铺序列
可以使用双层生成器表达式来平铺数据结构:
blocked = list(line.split() for line in file)
flat = (x for line in blocked for x in line)
list(flat)
结构化一维序列
将一维序列转换为复合序列的示例代码:
flat_iter = iter(flat)
(tuple(next(flat_iter) for i in range(5)) for row in range(len(flat)//5))
使用函数实现
def group_by_iter(n: int, iterable: Flat_Iter) -> Grouped_Iter:
row = tuple(next(iterable) for i in range(n))
while row:
yield row
row = tuple(next(iterable) for i in range(n))
列表切片技术
使用列表切片将一维列表数据转换为数据对:
zip(flat[0::2], flat[1::2])
反转序列
使用 reversed() 函数反转序列的示例:
def to_base(x: int, b: int) -> Iterator[int]:
return reversed(tuple(digits(x, b)))
使用 enumerate() 函数
enumerate() 函数为序列或可迭代对象添加下标信息,使用方法如下:
for index, value in enumerate(xi):
… print(index, value)
以上内容涉及了统计函数的实现、相关性计算及其在数据处理中的应用,掌握这些内容将有助于更好地进行数据分析和统计学学习。
📊 Python 高阶函数与归约方法

  1. enumerate() 函数
    “enumerate() 函数将输入序列的每一个元素变为一个二元组,其中第一个元素是下标值(序号),另一个是原始输入元素。”
    特点
    返回结果是可迭代对象,能够以任何可迭代对象为输入值。
    可以在统计处理过程中为每个样本值加序号,将一组简单值转换为一个时间序列。
    示例
    list(enumerate(xi))

输出示例:

[(0, 1.47), (1, 1.5), (2, 1.52), …]

  1. Python 内置归约方法
    2.1 any() 和 all()
    any() 和 all() 用于进行关键逻辑处理,相当于使用 or 或者 and 运算符进行归约。
    2.2 len() 和 sum()
    使用 len() 和 sum() 进行数值归约,能够实现一些高阶统计处理函数。
  2. 映射函数
    3.1 zip() 函数
    用于合并多个序列,可以结构化简单序列,或者将复合序列平铺为一维序列。
    3.2 reversed() 函数
    按照相反顺序输出输入序列,适用于某些算法需要以相反顺序呈现结果的情况。
  3. 高阶函数
    4.1 定义
    “高阶函数是指以函数为参数,或者以函数为返回值的函数。”
    4.2 三类高阶函数
    类型 描述
    输入参数中包含函数的函数 例如:map()、filter()等
    返回函数的函数 例如:装饰器
    输入参数和返回值均为函数 组合以上两种情况
    4.3 常用高阶函数
    max() 和 min():用于寻找极值。
    map():将函数应用于集合。
    filter():过滤集合元素。
  4. 使用 max() 和 min() 寻找极值
    5.1 默认行为模式
    max(1, 2, 3) # 输出:3
    max((1, 2, 3, 4)) # 输出:4
    5.2 示例:提取旅行数据
    包含起点、终点和距离的元组数据示例:
    trip = (
    ((37.54901619777347, -76.33029518659048), (37.840832, -76.273834), 17.7246),

    )
    5.3 提取距离信息
    long = max(dist for start, end, dist in trip)
    short = min(dist for start, end, dist in trip)
  5. 使用 Python 匿名函数
    6.1 定义
    “匿名函数是没有名字的函数,采用 lambda 形式,函数体只能是简单的表达式。”
    6.2 示例
    long = max(trip, key=lambda leg: leg[2])
    short = min(trip, key=lambda leg: leg[2])
    6.3 复用匿名函数
    可以将匿名函数赋给变量以便复用:
    start = lambda x: x[0]
    end = lambda x: x[1]
    dist = lambda x: x[2]
  6. 使用 map() 将函数应用于集合
    7.1 基本用法
    data = list(map(int, [‘2’, ‘3’, ‘5’, ‘7’]))

输出:[2, 3, 5, 7]

7.2 使用匿名函数
distances_miles = map(lambda x: (start(x), end(x), dist(x) * 6076.12 / 5280), trip)
8. 使用 map() 函数处理多个序列
8.1 zip() 与 map() 结合
可以将多个序列组合在一起处理:
map(function, zip(one_iterable, another_iterable))
8.2 示例:计算路径距离
distances_1 = map(lambda s_e: (s_e[0], s_e[1], haversine(*s_e)), zip(path, path[1:]))


以上内容涵盖了 Python 高阶函数和归约方法的核心概念和应用实例,适用于深入理解和复习相关知识。
第5章 高阶函数
📐 使用 map() 函数
“map() 函数的高级形式可以简化许多操作。”
map() 函数的定义
参数:map() 函数接收一个函数和多个可迭代对象。
功能:从每个可迭代对象中取出当前值,作为指定函数的输入参数。
示例代码
distances_2 = map(
lambda s, e: (s, e, haversine(s, e)),
path, path[1:]
)
在此示例中,匿名函数返回一个包含起点、终点和距离的三元组。
star-map 方法
map() 函数可以使用 star-map 方法处理任意多个可迭代对象。


🔍 使用 filter() 函数
“filter() 函数用于接收或舍弃数据。”
filter() 函数的作用
功能:将一个判定函数(谓词函数)应用于集合中的每个值。
返回值:如果谓词为真,则接收该值;否则将其舍弃。
示例代码
long = list(
filter(lambda leg: dist(leg) >= 50, trip)
)
在此示例中,谓词函数返回 True 时,路径段通过测试。
filter() 函数与生成器表达式
也可以使用生成器表达式来完成类似的过滤操作:
list(x for x in range(10) if x % 3 == 0 or x % 5 == 0)
返回的结果是 [0, 3, 5, 6, 9]。


⚖️ 检测异常值
异常值检测的实现方法
计算路径段的平均值和标准差,利用这些值来查找异常值。
示例代码
from ch04_ex4 import mean, stdev, z

dist_data = list(map(dist, trip))
μ_d = mean(dist_data)
σ_d = stdev(dist_data)
outlier = lambda leg: z(dist(leg), μ_d, σ_d) > 3
print(“Outliers”, list(filter(outlier, trip)))
计算出的异常值可以提高数据的一致性,降低计算偏差。


📚 使用 sorted() 函数
sorted() 函数的用法
功能:将数据按照指定规则排序。
方法:可以使用 list.sort() 或 sorted() 函数。
示例代码
sorted(trip, key=dist)
通过 key 参数指定排序方式,返回排序后的三元组列表。


🔧 编写高阶函数
高阶函数的分类
以函数为参数的函数。
以函数为返回值的函数。
以函数为参数并返回函数的函数。
高阶函数的应用
用于处理简单形式的数据,改变数据结构。
执行数据的打包、抽取、平铺等操作。


🔄 编写高阶映射和过滤函数
map() 和 filter() 的性能
map() 和 filter() 是处理数据的高阶函数,性能较优。
映射和过滤的实现方法
可以通过生成器表达式将映射和过滤合并到一次操作中。


📦 拆包并映射数据
拆包映射的实现
使用多重赋值拆解元组,并将函数应用于其中一部分数据。
示例代码
def convert(conversion: Conv_F, trip: Iterable[Leg]) -> Iterator[float]:
return (
conversion(distance) for start, end, distance in trip
)
此高阶函数使用转换函数处理原始数据。


🛠️ 打包多项数据并映射
打包数据的实现
创建高阶函数,将多个值打包成元组并进行映射。
示例代码
def cons_distance(distance: Point_Func, legs_iter: Iterable[Leg_Raw]) -> Iterator[Leg_D]:
return (
(start, end, round(distance(start, end), 4))
for start, end in legs_iter
)
该函数将每个路径段拆解并计算距离,返回三元组。


🔄 平铺数据并映射
通过嵌套元组平铺为简单可迭代对象,避免复杂状态的对象。
📊 高阶函数与生成器

  1. 数据结构的转换
    在处理数据结构时,可以通过生成器函数将文本转换为一维数值序列。以下是将多行文本合并为一个数值序列的示例:
    text = “”"
    2 3 5 7 11 13 17 19 23 29
    31 37 41 43 47 53 59 61 67 71
    73 79 83 89 97 101 103 107 109 113
    127 131 137 139 149 151 157 163 167 173
    179 181 191 193 197 199 211 223 227 229
    “”"

data = list(
v
for line in text.splitlines()
for v in line.split()
)
结果
上述代码执行后,data将包含以下字符串列表:
[‘2’, ‘3’, ‘5’, ‘7’, ‘11’, ‘13’, ‘17’, ‘19’, ‘23’, ‘29’,
‘31’, ‘37’, ‘41’, ‘43’, ‘47’, ‘53’, ‘59’, ‘61’, ‘67’, ‘71’,
‘73’, ‘79’, ‘83’, ‘89’, ‘97’, ‘101’, ‘103’, ‘107’, ‘109’,
‘113’, ‘127’, ‘131’, ‘137’, ‘139’, ‘149’, ‘151’, ‘157’,
‘163’, ‘167’, ‘173’, ‘179’, ‘181’, ‘191’, ‘193’, ‘197’,
‘199’, ‘211’, ‘223’, ‘227’, ‘229’]
2. 数值转换函数
为了将字符串转换为数值,需定义一个转换函数如下:
from numbers import Number
from typing import Callable, Iterator

Num_Conv = Callable[[str], Number]

def numbers_from_rows(conversion: Num_Conv, text: str) -> Iterator[Number]:
return (
conversion(value)
for line in text.splitlines()
for value in line.split()
)
使用示例
使用numbers_from_rows()函数将文本转换为浮点数列表:
print(list(numbers_from_rows(float, text)))
3. 结合高阶函数与生成器表达式
可以将高阶函数与生成器表达式结合,以更简洁地实现数据转换:
map(float,
value
for line in text.splitlines()
for value in line.split()
)
这种方法有助于理解算法的整体结构。
4. 过滤与结构化数据
4.1 过滤数据的定义
通过结构化数据算法,可以将带有结构化算法的过滤器与复杂函数结合。以下是从可迭代对象中取出部分值的函数示例:
from typing import Iterator, Tuple

def group_by_iter(n: int, items: Iterator) -> Iterator[Tuple]:
row = tuple(next(items) for i in range(n))
while row:
yield row
row = tuple(next(items) for i in range(n))
4.2 结合过滤与分组
通过将分组与过滤放在同一个函数体中,可以实现同时处理的功能:
def group_filter_iter(n: int, pred: Callable, items: Iterator) -> Iterator:
subset = filter(pred, items)
row = tuple(next(subset) for i in range(n))
while row:
yield row
row = tuple(next(subset) for i in range(n))
使用示例
group_filter_iter(7,
lambda x: x % 3 == 0 or x % 5 == 0,
range(1, 100)
)
5. 编写生成器函数
生成器函数可以简化函数的编写。使用生成器函数时,需要注意以下场景:
处理外部资源时使用with语句
需要使用while语句而非for语句
循环中使用break或return语句提前结束
使用try-except结构处理异常
6. 处理无效数据的映射
处理包含无效值的数据时,可以使用如下的map_not_none函数:
def map_not_none(func: Callable, source: Iterable) -> Iterator:
for x in source:
try:
yield func(x)
except Exception:
pass
使用示例
data = map_not_none(int, some_source)
7. 使用可调用对象构建高阶函数
可以通过创建Callable类对象来定义高阶函数。如下所示:
from typing import Callable, Optional, Any

class NullAware:
def init(self, some_func: Callable[[Any], Any]) -> None:
self.some_func = some_func

def __call__(self, arg: Optional[Any]) -> Optional[Any]: 
    return None if arg is None else self.some_func(arg) 

使用示例
null_log_scale = NullAware(math.log)
scaled = map(null_log_scale, some_data)
8. 设计模式回顾
在处理集合数据时,设计模式可供参考,包括:
返回生成器
类生成器
实例化集合
归约集合
标量
以上模式在编写高阶函数时都可以应用。
📚 归约与递归
归约函数概述
“归约函数是一种通过将输入数据转换为更简洁的结构体来简化数据处理的工具。”
functools.reduce() 函数
归约操作:functools.reduce() 是一个归约函数,可以将输入序列归约为单个值。
collections.Counter() 函数:可以视为一种归约,它对输入数据进行分组并计数,返回新的数据结构。
函数名称 功能描述
functools.reduce() 将序列归约为单个值
collections.Counter() 对输入数据分组并计数
递归与归约
“递归是一种通过函数调用自身来解决问题的编程技巧,通常用于定义归约算法。”
递归定义
归约操作通常可以通过递归定义。函数式语言的编译器会优化尾递归以提升性能。
在 Python 中,必须手动优化尾调用,将递归调用显式转换为循环。
递归的基本构成
基础情形:直接返回确定的值。
递归情形:通过不同的输入参数对自身进行调用。
递归构成 描述
基础情形 直接返回确定的值
递归情形 函数通过不同的输入参数对自身的调用给出返回值
归约算法实例

  1. 加法的递归实现
    def add(a: int, b: int) -> int:
    if a == 0:
    return b
    else:
    return add(a - 1, b + 1)
    输入约束:确保 a >= 0 和 b >= 0,否则递归将无法结束。
  2. 尾调用优化的实现
    阶乘函数的递归实现:
    def fact(n: int) -> int:
    if n == 0:
    return 1
    else:
    return n * fact(n - 1)
    改进的阶乘实现(命令式):
    def facti(n: int) -> int:
    if n == 0:
    return 1
    f = 1
    for i in range(2, n):
    f = f * i
    return f
  3. 斐波那契数列的递归实现
    斐波那契函数的简单实现:
    def fib(n: int) -> int:
    if n == 0:
    return 0
    if n == 1:
    return 1
    return fib(n - 1) + fib(n - 2)
    改进的斐波那契实现(有状态):
    def fibi(n: int) -> int:
    if n == 0:
    return 0
    if n == 1:
    return 1
    f_n2, f_n1 = 1, 1
    for _ in range(3, n + 1):
    f_n2, f_n1 = f_n1, f_n2 + f_n1
    return f_n1
    集合的归约与折叠
    sum() 函数的定义
    空集合的和:0
    非空集合的和:第一个元素加上剩余元素的和。
    def sum(collection: Sequence[float]) -> float:
    if len(collection) == 0:
    return 0
    return collection[0] + sum(collection[1:])
    乘积的归约
    空集合的乘积:1
    非空集合的乘积:第一个元素乘以剩余元素的乘积。
    def prod(collection: Sequence[float]) -> float:
    if len(collection) == 0:
    return 1
    return collection[0] * prod(collection[1:])
    collections.Counter 的使用
    Counter 的基本用法:
    from collections import Counter
    quantized = (5 * (dist // 5) for start, stop, dist in trip)
    frequency = Counter(quantized)
    输出频次:
    print(frequency.most_common())
    分组与映射
    用排序构建映射
    函数实现:
    def group_sort1(trip: Iterable[Leg]) -> Dict[int, int]:
    def group(data: Iterable[T_]) -> Iterable[Tuple[T_, int]]:
    previous, count = None, 0
    for d in sorted(data):
    if d == previous:
    count += 1
    else:
    if previous is not None:
    yield previous, count
    previous, count = d, 1
    if previous is not None:
    yield previous, count
    “通过对数据的分组和映射,可以高效地进行数据处理和分析。”
    🛠️ group-by 归约
  4. group() 函数的定义与实现
    函数目的
    group() 函数用于对排序后的数据集合进行遍历,并对相同值进行计数。
    函数实现
    def group(data: Iterable[T_]) -> Iterable[Tuple[T_, int]]:
    sorted_data = iter(sorted(data))
    previous, count = next(sorted_data), 1
    for d in sorted_data:
    if d == previous:
    count += 1
    elif previous is not None:
    yield previous, count
    previous, count = d, 1
    else:
    raise Exception(“Bad bad design problem.”)
    yield previous, count
    逻辑解析
    初始值设定: 从排序后的数据中取出第一个数据作为 previous 的初始值。
    递归方法: 每次递归处理一个值,增加计数器或产生新的键值对。
    特殊情况处理: 处理初始值为 None 的情况。
  5. 使用键值对数据分组
    数据分组
    对分组后的数据进行归约操作,计算各项统计值,如最大值、最小值、平均值和标准差等。
    示例代码
    from typing import Callable, Sequence, Dict, List, TypeVar

S_ = TypeVar(“S_”)
K_ = TypeVar(“K_”)

def group_by(key: Callable[[S_], K_], data: Sequence[S_]) -> Dict[K_, List[S_]]:
def group_into(key: Callable[[S_], K_], collection: Sequence[S_], dictionary: Dict[K_, List[S_]]) -> Dict[K_, List[S_]]:
if len(collection) == 0:
return dictionary
head, *tail = collection
dictionary[key(head)].append(head)
return group_into(key, tail, dictionary)

return group_into(key, data, defaultdict(list))

函数解析
辅助函数: group_into() 实现了递归逻辑,处理集合头部和尾部。
类型标示: 使用 TypeVar 来区分数据源类型和键类型。
3. 高阶归约方法
简单归约函数示例
def s0(data: Sequence) -> float:
return sum(1 for x in data)

def s1(data: Sequence) -> float:
return sum(x for x in data)

def s2(data: Sequence) -> float:
return sum(x * x for x in data)
高阶函数定义
def sum_f(function: Callable[[Any], float], data: Iterable) -> float:
return sum(function(x) for x in data)
过滤器扩展
def sum_filter_f(filter_f: Callable, function: Callable, data: Iterable) -> Iterator:
return sum(function(x) for x in data if filter_f(x))
4. 文件解析器的实现
解析器设计
解析器将底层文件内容转化为结构化数据。
实现示例
def lexical_scan(some_source):
for char in some_source:
if some pattern completed:
yield token
else:
accumulate token
XML 文件解析示例
def row_iter_kml(file_obj: TextIO) -> Iterator[List[str]]:
ns_map = {
“ns0”: “http://www.opengis.net/kml/2.2”,
“ns1”: “http://www.google.com/kml/ext/2.2”
}
xpath = (
“./ns0:Document/ns0:Folder/”
“ns0:Placemark/ns0:Point/ns0:coordinates”
)
doc = XML.parse(file_obj)
return (
comma_split(cast(str, coordinates.text))
for coordinates in doc.findall(xpath, ns_map)
)
数据转换
def pick_lat_lon(lon: Any, lat: Any, alt: Any) -> Tuple[Any, Any]:
return lat, lon
以上内容提供了 group-by 归约的详细实现与应用示例,涵盖了从数据分组到高阶归约及文件解析的全过程,适合用于深入理解该主题。
📊 解析与处理数据
高阶函数与生成器
“float_lat_lon() 是一个高阶函数,返回生成器表达式。”
函数定义
float_lat_lon():该函数使用生成器表达式,通过 map() 函数将 float() 应用于 pick_lat_lon() 函数的返回结果。
参数:*row 将行数据元组的每个元素作为 pick_lat_lon() 函数的参数。
示例
def float_lat_lon(row_iter: Iterator[Tuple[float, …]]) -> Iterator[Tuple[float, float]]:
return (
tuple(map(float, pick_lat_lon(*row)))
for row in row_iter
)
CSV 文件解析
示例数据
原始数据如下,数据之间以 Tab 字符分隔,文件前3 行是需要去掉的文件头部分:
Anscombe’s quartet
I II III IV
x y x y x y x y
10.0 8.04 10.0 9.14 10.0 7.46 8.0 6.58

CSV 解析器
row_iter_csv():返回一个包含 Tab 分隔符的迭代器。
import csv
def row_iter_csv(source: TextIO):
rdr = csv.reader(source, delimiter=“\t”)
return rdr
数据转换
转换函数
float_none():将格式正确的字符串转换为浮点数,将无效数据转换为 None。
from typing import Optional, Text
def float_none(data: Optional[Text]) -> Optional[float]:
try:
return float(data)
except ValueError:
return None
映射与验证
float_row:将整行数据转换为浮点数或 None。
from typing import Callable, List, Optional
R_Text = List[Optional[Text]]
R_Float = List[Optional[float]]

float_row: Callable[[R_Text], R_Float] = lambda row: list(map(float_none, row))
all_numeric:行级验证器,确保所有值都是浮点数。
all_numeric: Callable[[R_Float], bool] = lambda row: all(row) and len(row) == 8
解析带文件头的普通文本文件
复杂解析器
row_iter_gpl():处理包含文件头和尾部数据行的文件。
from typing import Tuple, Iterator, List
import re

Head_Body = Tuple[Tuple[str, str], Iterator[List[str]]]

def row_iter_gpl(file_obj: TextIO) -> Head_Body:
header_pat = re.compile(r"GIMP Palette\nName:\s*(.?)\nColumns:\s(.*?)\n#\n", re.M)

def read_head(file_obj: TextIO) -> Tuple[Tuple[str, str], TextIO]:
    match = header_pat.match("".join(file_obj.readline() for _ in range(4)))
    return (match.group(1), match.group(2)), file_obj

def read_tail(headers: Tuple[str, str], file_obj: TextIO) -> Head_Body:
    return (headers, (next_line.split() for next_line in file_obj))

return read_tail(*read_head(file_obj))

高阶解析函数
color_palette():使用命名元组处理颜色数据。
from typing import NamedTuple

class Color(NamedTuple):
red: int
green: int
blue: int
name: str

def color_palette(headers: Tuple[str, str], row_iter: Iterator[List[str]]) -> Tuple[str, str, Tuple[Color, …]]:
name, columns = headers
colors = tuple(
Color(int®, int(g), int(b), " ".join(name)) for r, g, b, *name in row_iter
)
return name, columns, colors
使用示例
with open(“crayola.gpl”) as source:
name, cols, colors = color_palette(*row_iter_gpl(source))
print(name, cols, colors)
小结
本章探讨了数据解析的基本技术,特别是如何使用高阶函数和生成器来处理和转换数据。通过示例展示了如何解析 CSV 文件和带文件头的文本文件,以及如何使用命名元组来组织和管理数据。
🗺️ 处理路径数据
“整个处理过程由一系列生成器表达式组成。”
生成器表达式
生成器表达式用于创建可迭代对象,以下是几个重要的生成器表达式的示例:
ip_iter = (
Leg(start, end, round(haversine(start, end), 4))
for start, end in pair_iter
)
关键点
Leg: 表示路径的起点和终点以及两者之间的距离。
haversine: 计算两个点之间的距离,保留四位小数。
数据提取
数据流
path_iter对象从KML文件中读取数据并提取为Point对象。
pair_iter对象通过legs()生成器函数创建路径的起点和终点。
示例
trip = list(trip_iter)
return trip
类型标注与数据转换
cast()函数
用于通知类型检查工具mypy,指定对象类型为TextIO。
cast(TextIO, codecs.getreader(‘utf-8’)(cast(BinaryIO, source)))
字节与文本的处理
使用codecs模块将字节转换为UTF-8编码的文本。
命名元组的应用
使用命名元组
NamedTuple帮助提高代码可读性。
from typing import NamedTuple

class Pair(NamedTuple):
x: float
y: float
三种创建命名元组的方法
方法 描述
根据位置对参数赋值 直接使用位置来指定参数值
星号参数 从可迭代对象中解包参数值
显式参数名称赋值 使用明确的参数名称来赋值,增强代码的可读性
斯皮尔曼等级相关系数
计算方法
斯皮尔曼等级相关系数用于表征两组变量的相关度,通过比较变量的等级而非具体值。
y_rank = list(rank_y(series_I))
生成器函数示例
def rank_y(pairs: Iterable[Pair]) -> Iterator[RankedPair]:
return enumerate(sorted(pairs, key=lambda p: p.y))
数据结构的设计
数据结构的选择
采用不可变对象替代有状态的类,减少复杂性。
数据结构类型 描述
Tuple 不可变的元组,适用于简单数据结构
NamedTuple 带有字段名称的元组,适合提高可读性
处理等级值
使用高阶函数对数据进行等级值赋值,避免状态变化。
def rank(data: Iterable[D_], key: Callable[[D_], K_]=lambda obj: cast(K_, obj)) -> Iterator[Tuple[float, D_]]:
示例数据处理
示例代码
data = [(2, 0.8), (3, 1.2), (5, 1.2), (7, 2.3), (11, 18)]
list(rank(data, key=lambda x:x[1]))
输出结果
输出包含等级值和原始数据的二元组列表。
选择器函数
定义选择器函数
用于从复杂的数据结构中提取数据项。
x_rank = lambda ranked: ranked[0]
y_rank = lambda ranked: ranked[1][0]
raw = lambda ranked: ranked[1][1]
处理策略
通过包装和拆包策略来处理数据,避免使用有状态的类定义。
总结
在处理路径数据时,使用生成器表达式、命名元组以及高阶函数等技术来简化数据处理和提高代码的可读性。
📊 数据变换与等级排序
数据变换过程
数据变换涉及到对 x 和 y 的等级值求取,整个过程分为两步:

  1. 简单包装
  2. 更通用的“拆包−再包装”
    等级排序函数定义
    函数 rank_xy
    def rank_xy(pairs: Sequence[Pair]) -> Iterator[Ranked_XY]:
    return (
    Ranked_XY(
    r_x=r_x, r_y=rank_y_raw[0], raw=rank_y_raw[1])
    for r_x, rank_y_raw in
    rank(rank_y(pairs), lambda r: r.raw.x)
    )
    步骤解释:
    使用 rank_y() 函数创建 Rank_Y 对象。
    对这些对象应用 rank() 函数,基于原始数据中的 x 属性求等级值。
    函数返回一个二元组:x 等级值和 Rank_Y 对象。
    最后基于 x 等级值 r_x、y 等级值 rank_y_raw[0] 和原始对象 rank_y_raw[1] 创建 Ranked_XY 对象。
    元组处理与Ranked_XY对象
    Ranked_XY 对象的构建:
    通过拆包原有数据并再次打包,形成更复杂的结构,这是向元组中添加新变量的一种常用方法。
    示例数据
    data = (Pair(x=10.0, y=8.04), Pair(x=8.0, y=6.95),
    Pair(x=13.0, y=7.58), Pair(x=9.0, y=8.81),
    Pair(x=5.0, y=5.68))
    创建等级对象:
    list(rank_xy(data))

输出: [

Ranked_XY(r_x=1.0, r_y=1.0, raw=Pair(x=4.0, y=4.26)),

Ranked_XY(r_x=2.0, r_y=3.0, raw=Pair(x=5.0, y=5.68)),

Ranked_XY(r_x=11.0, r_y=10.0, raw=Pair(x=14.0, y=9.96))

]

计算斯皮尔曼等级顺序相关度
斯皮尔曼相关度公式
斯皮尔曼等级顺序相关度用于比较两个变量等级的相关性,计算公式为:
ρ=1-(6∑d2)/(n(n2-1))
其中 d 为对应观测值的等级值差,n 为观测值对的数量。
函数 rank_corr
def rank_corr(pairs: Sequence[Pair]) -> float:
ranked = rank_xy(pairs)
sum_d_2 = sum((r.r_x - r.r_y) ** 2 for r in ranked)
n = len(pairs)
return 1 - 6 * sum_d_2 / (n * (n ** 2 - 1))
使用 Rank_XY 对象表示数据对,计算 r_x 和 r_y 属性的差值,然后取差值的平方和。
相关系数的含义
相关系数为 0 表示没有相关性,接近 1 或 -1 则表示相关性很强。
示例数据集
基于安斯库姆四重奏数据序列的示例:
data = (Pair(x=10.0, y=8.04), Pair(x=8.0, y=6.95),
Pair(x=13.0, y=7.58), Pair(x=9.0, y=8.81),
Pair(x=11.0, y=8.33), Pair(x=14.0, y=9.96),
Pair(x=6.0, y=7.24), Pair(x=4.0, y=4.26),
Pair(x=12.0, y=10.84), Pair(x=7.0, y=4.82),
Pair(x=5.0, y=5.68))
round(pearson_corr(data), 3)

输出: 0.816

说明该数据集的相关性很强。

皮尔逊相关系数
皮尔逊相关系数的计算
import Chapter_4.ch04_ex4
def pearson_corr(pairs: Sequence[Pair]) -> float:
X = tuple(p.x for p in pairs)
Y = tuple(p.y for p in pairs)
return ch04_ex4.corr(X, Y)
将 Pair 对象拆开后,作为输入传给 corr() 函数,得到了皮尔逊相关系数。
比较斯皮尔曼与皮尔逊
使用多种统计工具进行比较是重要的,斯皮尔曼和皮尔逊相关系数在某些数据集中可能差别不大,但在其他数据集中差别会很大。
多态与类型匹配
Python 的类型处理
Python 动态确定运算符的最终实现方式,函数的输入输出数据类型不需要严格匹配。
解决方案
使用 isinstance() 函数确定具体的数据类型。
创建 numbers.Number 或 NamedTuple 的子类并实现多态方法。
抽象化等级排序
定义命名元组 Rank_Data
from typing import NamedTuple, Tuple, Any
class Rank_Data(NamedTuple):
rank_seq: Tuple[float]
raw: Any
处理不同数据结构
rank_data() 函数处理多种类型的输入数据,包括 Sequence 和 Iterator。
def rank_data(
seq_or_iter: Union[Sequence[Source], Iterator[Source]],
key: Callable[[Rank_Data], K_] = lambda obj: cast(K_, obj)
) -> Iterable[Rank_Data]:
该函数可以处理四种组合:Sequence[Rank_Data]、Sequence[Any]、Iterator[Rank_Data]、Iterator[Any]。
尾递归优化
运用尾调用优化技术将递归优化为循环,确保单元测试通过后再进行优化。
示例
scalars = [0.8, 1.2, 1.2, 2.3, 18]
list(rank_data(scalars))

输出: [

Rank_Data(rank_seq=(1.0,), raw=0.8),

Rank_Data(rank_seq=(2.5,), raw=1.2),

]

处理更复杂的数据时,可能会出现多个等级值。

📦 itertools 模块

  1. itertools 模块简介
    “itertools 模块提供了许多处理可迭代对象的高级方法,这些方法有助于实现简洁明了的函数式设计。”
    核心思想
    无状态对象:函数式编程强调使用无状态对象,通过生成器表达式、生成器函数和可迭代对象来替代可变对象。
    实现细节:用户无需过多关注可迭代对象的实现细节,而是通过利用可迭代对象的优势来编写简洁的程序。
  2. 使用无限迭代器
    2.1 无限迭代器的函数
    itertools 模块提供了多种无限迭代器的函数,主要包括:
    函数 功能说明
    count() range() 函数的无限版本,给出起始值和可选步长
    cycle() 循环迭代一组值
    repeat() 按指定次数重复单个值
    2.2 用 count() 计数
    count() 函数与 range() 函数不同,count() 不需要定义上界,只需给出起始值和步长。
    enumerate = lambda x, start=0: zip(count(start), x)
    示例:
    list(zip(count(), iter(‘word’))) # 输出: [(0, ‘w’), (1, ‘o’), (2, ‘r’), (3, ‘d’)]
    2.3 使用实数参数计数
    count() 函数可以接收非整形参数,例如 count(0.5, 0.1)。注意使用整形参数来避免累积误差。
    until(terminate: Callable[[T_], bool], iterator: Iterator[T_]) -> T_
    2.4 用 cycle() 循环迭代
    cycle() 函数可以重复循环一组值,适用于将数据集标识符与数据集进行分组。
    示例:生成True和False值序列。
    m3 = (i == 0 for i in cycle(range(3)))
    结合 zip() 函数,处理数值序列与标识序列。
    multipliers = zip(range(10), m3, m5)
    2.5 用 repeat() 重复单个值
    repeat() 函数用于重复返回单个值,可以用来替代 cycle() 函数。
    choose = lambda rule: (x == 0 for x in rule)
    示例:
    all = repeat(0)
    subset = cycle(range(100))
  3. 使用有限迭代器
    3.1 有限迭代器的函数
    itertools 模块还包含许多用于生成有限序列的函数,主要包括:
    函数 功能说明
    enumerate() 将值与其在原始序列中的位置序号组对
    accumulate() 返回可迭代对象的归约序列
    chain() 将多个可迭代对象按顺序组合在一起
    groupby() 根据指定函数将输入的可迭代对象分割为多个子对象
    zip_longest() 合并多个可迭代对象,短的部分用填充值填充
    compress() 根据布尔可迭代对象筛选第一个可迭代对象
    islice() 切片函数的可迭代对象版本
    dropwhile() 根据布尔函数过滤可迭代对象
    filterfalse() 反向过滤可迭代对象
    starmap() 将函数应用于由元组组成的可迭代序列
    3.2 用 enumerate() 添加序号
    enumerate() 函数可以将一个值与其在原始序列中的位置序号组对。
    pairs = tuple(enumerate(sorted(raw_values)))
    示例:
    raw_values = [1.2, .8, 1.2, 2.3, 11, 18]
    tuple(enumerate(sorted(raw_values))) # 输出: ((0, 0.8), (1, 1.2), (2, 1.2), (3, 2.3), (4, 11), (5, 18))
    3.3 更复杂的数据结构
    通过定义更复杂的类,可以将序号信息加入元组中。
    class Point(NamedTuple):
    latitude: float
    longitude: float

class Leg(NamedTuple):
start: Point
end: Point
distance: float
生成器函数示例:
def ordered_leg_iter(pair_iter: Iterator[Tuple[Point, Point]]) -> Iterator[Leg]:
for order, pair in enumerate(pair_iter):
start, end = pair
yield Leg(order, start, end, round(haversine(start, end), 4))
3.4 应用场景
该模块在处理大型数据集时非常有用,可以通过简单的脚本实现数据清洗和过滤操作。
📊 使用 itertools 模块
8.2.1 用 accumulate() 计算汇总值
accumulate() 函数基于给定的函数返回一个可迭代对象,将一系列归约值汇总在一起,遍历迭代器得出当前汇总值。
默认的函数是 operator.add()。
可以使用乘积函数代替默认的求和函数来改变 accumulate() 的行为。
示例:计算四等分值
使用当前汇总值计算数据四等分值:
“quartiles”=(“int” (4⋅d/“total” )" for " d" in distance_accum" )
示例代码:
distances = (leg.distance for leg in trip)
distance_accum = tuple(accumulate(distances))
total = distance_accum[-1] + 1.0
quartiles = tuple(int(4 * d / total) for d in distance_accum)
8.2.2 用 chain() 组合多个迭代器
chain() 函数将多个迭代器组合为单个迭代器,例如将被 groupby() 函数分开的数据重新组合起来。
示例:处理多个文件中的数据
使用 contextlib.ExitStack() 方法结合 chain() 函数:
from contextlib import ExitStack
import csv

def row_iter_csv_tab(*filenames: str) -> Iterator[List[str]]:
with ExitStack() as stack:
files = [stack.enter_context(open(name, ‘r’)) for name in filenames]
readers = map(lambda f: csv.reader(f, delimiter=‘\t’), files)
yield from chain(*readers)
8.2.3 用 groupby() 切分迭代器
groupby() 函数将一个迭代器切分为多个小迭代器。
示例:分组数据
使用 zip() 函数将四等分值和原始数据组合:
group_iter = groupby(zip(quartile, trip), key=lambda q_raw: q_raw[0])
for group_key, group_iter in group_iter:
print(group_key, tuple(group_iter))
注意事项
groupby() 函数的输入中的 key 值必须是排序好的,以确保分在一组中的元素是相邻的。
8.2.4 用 zip_longest() 和 zip() 合并迭代器
zip() 返回结果的长度是输入参数中最短序列的长度。
zip_longest() 为较短的序列填充值,直到遍历完最长的序列。
示例:使用 zip_longest()
from itertools import zip_longest

示例数据

a = [1, 2, 3]
b = [‘a’, ‘b’]

合并

result = list(zip_longest(a, b, fillvalue=‘-’))
8.2.5 用 compress() 过滤
内置的 filter() 函数使用谓词来确定对某个数据项的取舍。
示例:使用 compress()
def filter(function, iterable):
i1, i2 = tee(iterable, 2)
return compress(i1, map(function, i2))
8.2.6 用 islice() 选取子集
示例:选择数据对
使用 islice() 函数从大型数据集中提取一个子集:
from itertools import islice

flat = [‘2’, ‘3’, ‘5’, ‘7’, ‘11’, ‘13’]
flat_iter_1 = iter(flat)
flat_iter_2 = iter(flat)

result = list(zip(islice(flat_iter_1, 0, None, 2), islice(flat_iter_2, 1, None, 2)))
8.2.7 用 dropwhile() 和 takewhile() 过滤状态
dropwhile() 和 takewhile() 是有状态的过滤函数。
示例:过滤文件头部和尾部
with open(“crayola.gpl”) as source:
rdr = csv.reader(source, delimiter=‘\t’)
rows = dropwhile(lambda row: row[0] != ‘#’, rdr)
8.2.8 基于 filterfalse() 和 filter() 的两种过滤方法
filterfalse = (lambda pred, iterable: filter(lambda x: not pred(x), iterable))
8.2.9 将 starmap() 和 map() 应用于数据
map() 函数是高阶函数,能对可迭代对象中的每个元素进行映射。
starmap() 函数适用于处理嵌套元组式的数据结构。
示例:使用 starmap() 生成 Leg 对象
from Chapter_7.ch07_ex1 import float_lat_lon, Leg, Point

示例代码创建 Leg 对象

以上是本节的主要内容,涵盖了使用 itertools 模块的各种方法和示例。
📚 itertools 模块与迭代器

  1. 创建路径段
    make_leg 函数
    make_leg = (lambda start, end:
    Leg(start, end, haversine(start,end))
    ) # type: Callable[[Point, Point], Leg]
    输入: 一对 Point 对象
    返回值: 包含起点、终点和距离的 Leg 对象。
    legs 函数
    功能: 生成包含一段路径起点和终点的 Point 对象二元组,用作 make_leg() 的输入,并最终生成 Leg 对象。
    starmap 方法
    功能: 使用 starmap(function, some_list) 方法可以避免写出 (function(*args) for args in some_list) 的冗长生成器表达式,提高可读性。
  2. 使用 tee() 函数克隆迭代器
    迭代器的规则
    “迭代器只能使用一次。”
    tee() 函数的使用
    功能: 克隆可迭代对象,允许多次使用序列中的数据而不必实例化整个序列。
    示例:计算算术平均值
    def mean(iterator: Iterator[float]) -> float:
    it0, it1 = tee(iterator, 2)
    N = sum(1 for x in it0)
    s1 = sum(x for x in it1)
    return s1 / N
    注意: float 类型标示并不排除使用整数,mypy 能处理类型转换。
    tee() 函数的限制
    在大多数 Python 实现中,克隆通过实例化序列实现,处理小数据集时效果良好,但对于大型数据集效果往往不佳。
    实现需要遍历源迭代器,使用时需谨慎。
  3. itertools 模块代码范例
    自定义函数总结
    函数名 参数列表 返回结果
    take (n, iterable) 以列表形式返回可迭代对象的前 n 个值。
    tabulate (function, start=0) 返回 function(0), function(1), …。
    consume (iterator, n) 使可迭代对象前进 n 步,返回空列表。
    nth (iterable, n, default=None) 返回可迭代对象的第 n 个值。
    quantify (iterable, pred=bool) 返回满足谓词函数的元素个数。
    padnone (iterable) 返回输入可迭代对象的值,并在后面追加无限个 None 值。
    ncycles (iterable, n) 返回 n 次可迭代对象中的值。
    dotproduct (vec1, vec2) 返回两个向量对应分量乘积之和。
    flatten (listOfLists) 展开一层嵌套列表,链接为一个列表。
    repeatfunc (func, times=None, *args) 执行 times 次函数 func。
    pairwise (iterable) 返回相邻元素的元组。
  4. 小结
    本章介绍了 itertools 模块中的部分函数,帮助我们更好地使用可迭代对象。
    讨论了无限迭代器和有限迭代器的区别,并展示了如何使用这些函数简化代码。
    提到官方文档中的“Itertools Recipes”部分是学习的重要资源。
  5. 高级 itertools 技术
    迭代器函数分类
    无限迭代器: 适用于任何可迭代对象。
    有限迭代器: 生成数据源的归约。
    tee() 函数: 将一个迭代器克隆多份。
    笛卡儿积
    定义: 基于一组集合生成所有可能的元素组合。
    示例
    list(product(range(1, 14), ‘♣♦♥♠’))
    对积进行归约
    可以通过 filter() 和 product() 函数实现表间的 join 操作。
    性能分析
    使用并行策略、保存中间计算结果以避免重复计算、使用新算法可以提高处理速度。
    🖼️ 颜色映射与处理
    颜色映射的基本概念
    在图像处理中,颜色映射是将源颜色转换为目标颜色的过程。通过预先计算所有可能的映射关系,可以避免重复计算,从而提高效率。
    具体实现步骤
  6. 评估问题规模
    在处理图像时,首先需要评估问题的规模。以下是一个示例代码,用于计算一张名为“IMG_2705.jpg”的图片的颜色元组总规模:
    from collections import defaultdict, Counter
    palette = defaultdict(list)
    for xy, rgb in pixel_iter(img):
    palette[rgb].append(xy)

w, h = img.size
print(“Total pixels”, w * h)
print(“Total colors”, len(palette))
2. 计算结果
对一幅具体的图像进行颜色计算时,可以得到以下信息: - 像素总数为 9,980,928,符合一个 10MB 大小的图片的预期。 - 共有 210,303 种颜色,计算这些颜色与 133 种目标颜色之间的欧氏距离需要 27,970,299 次计算,耗时约 76 秒。
3. 使用比特遮罩
比特遮罩用于保留重要的比特,去掉不重要的比特。以下是一个例子,展示如何使用 Python 的 bin() 函数查看颜色的二进制表示:

bin(200)
‘0b11001000’
200 & 0b11100000
192
bin(192)
‘0b11000000’

  1. RGB 三元组的遮罩处理
    对 RGB 三元组进行遮罩处理的代码如下:
    masked_color = tuple(map(lambda x: x & 0b11100000, c))
  2. 重构问题
    为了将源颜色映射到目标颜色,只需将 20 万种颜色保存在一个表中,而不是直接比较所有像素和颜色。实现步骤如下:
    计算源色彩到目标色彩的映射。
    计算源颜色表中与最近颜色的欧氏距离,执行一次生成 20 万个映射。
    基于修正后的颜色表生成新图像。
  3. 组合变换
    组合多个变换可以构建出复杂的映射关系。可以将颜色截断和映射组合在一起,以实现更高效的算法。
    排列与组合
  4. 排列集合元素
    排列集合元素指列出集合中元素的所有排列形式。对于长度为 n 的集合,共有 n! 种排列形式。
  5. 生成所有组合
    除了排列,itertools 模块还提供了计算集合元素组合的函数。对于一个给定的集合,组合的数量远小于排列的数量。
    计算组合的示例
    使用 itertools 中的 combinations 函数可以列出所有可能的组合。例如,5 张扑克牌共有 2,598,960 种组合方式。
    hands = list(combinations(tuple(product(range(13), ‘♠♥♦♣’)), 5))
    处理相关性分析
    在进行数据分析时,可以使用组合生成所有需要比较的变量对,示例如下:
    for p, q in combinations(range(9), 2):
    header_p, *data_p = list(column(source, p))
    header_q, *data_q = list(column(source, q))
    总结
    本章讲解了如何使用 Python 的 itertools 模块进行图像处理中的颜色映射,以及如何进行组合和排列的计算。这些技术和工具可以帮助开发者更高效地处理和分析数据。
    🔧 functools 模块
    高阶函数与装饰器
    “高阶函数是以函数为参数或返回函数的函数,能够大大提升编程的灵活性和可读性。”
    functools 库中的函数
    update_wrapper() 和 wraps():这两个函数将在下一章详细介绍,用于编写自定义装饰器。
    cmp_to_key():用于将基于比较的 Python 2 版本代码转换为 Python 3 的键值提取方式。本书专注于 Python 3,主要讨论如何正确编写键值函数。
    10.1 函数工具
    在第5章中介绍过的高阶函数包括: - max() - min() - sorted()
    这些函数通过接收 key= 参数实现定制化。另有 map() 和 filter() 函数,它们通过接收一个函数和一个可迭代对象,将函数应用于可迭代对象。
    函数 描述
    map() 将指定函数应用于可迭代对象的每个元素
    filter() 根据函数返回的布尔值筛选可迭代对象中的值
    10.2 使用 lru_cache 保存已有计算结果
    “使用 @lru_cache 装饰器能加快函数运行。LRU 指最近使用的,缓存中保留最近使用的计算结果。”
    示例:斐波那契数列
    from functools import lru_cache

@lru_cache(128)
def fibc(n: int) -> int:
if n == 0: return 0
if n == 1: return 1
return fibc(n - 1) + fibc(n - 2)
当调用 fibc(n) 时,会检查缓存池,如果参数 n 在缓存中,则直接返回结果,避免重复计算。
每次计算的结果都会保存到缓存中,显著提升性能。
性能对比
通过 timeit 模块验证性能提升:
实现方式 计算时间 (秒)
非缓存实现 3.23
带缓存实现 0.0779
注意事项
使用 fibc.cache_clear() 方法可清除缓存。
缓存技术在计算重复的结果时非常有效,但在不常用的场景中可能会增加开销。
10.3 使用 total_ordering 定义类
“使用 @total_ordering 装饰器可以定义实现多种比较运算的类。”
示例:扑克牌类
from functools import total_ordering
from typing import NamedTuple

@total_ordering
class Card(NamedTuple):
rank: int
suit: str

def __eq__(self, other):
    return self.rank == other.rank

def __lt__(self, other):
    return self.rank < other.rank
通过 @total_ordering,类自动生成其他比较方法,如 __le__()、__gt__() 和 __ge__()。

使用示例
c2 = Card(2, ‘♣’)
c3 = Card(3, ‘♥’)
c2 <= c3 # True
10.4 使用 partial() 函数应用部分参数
“partial() 函数生成基于旧函数及其部分参数的新函数,与柯里化密切相关。”
示例
from functools import partial

exp2 = partial(pow, 2)
print(exp2(12)) # 4096
partial() 函数通过绑定部分参数来简化函数调用。
10.5 使用 reduce() 函数归约数据集
“可以将 sum()、len()、max() 和 min() 函数视为 reduce() 函数的特殊形式。”
示例:使用 reduce()
from functools import reduce

d = [2, 4, 4, 4, 5, 5, 7, 9]
result = reduce(lambda x, y: x + y, d)
归约操作是通过指定函数结合可迭代对象中的相邻元素。
注意事项
使用 reduce() 时务必提供初始值,避免计算错误。
项目 描述
初始值 必须提供以确保计算过程正确
归约函数示例 reduce(lambda x, y: x + y, data, 0)
合并 map() 和 reduce()
def map_reduce(map_fun, reduce_fun, source):
return reduce(reduce_fun, map(map_fun, source))
map_reduce 函数结合了映射和归约操作,提供了灵活的数据处理方式。
以上内容涵盖了 functools 模块的主要功能和使用示例,掌握这些知识点将有助于在编写高效且可读的 Python 代码时发挥作用。
🛠️ 使用 functools 模块进行数据处理

  1. 使用 reduce() 函数和 partial() 函数
    “可以将 sum() 函数定义为 partial(reduce, operator.add),这也提示我们可以用类似的方法创建其他映射和归约。”
    partial() 函数
    partial() 函数可以通过部分应用参数创建新函数,简化函数的调用。
    示例代码:
    from functools import partial
    from operator import add
    sum2 = partial(reduce, lambda x, y: x + y ** 2)
    count = partial(reduce, lambda x, y: x + 1)
    reduce() 函数
    reduce() 函数的时间复杂度是 O(n^2),在处理大规模数据时效率较低。
    通过 partial() 函数的使用,可以改善某些情况下的性能。
  2. 使用 map() 函数和 reduce() 函数清洗数据
    数据清洗函数
    定义一个清洗函数 comma_fix(),用于将字符串转换为浮点数。
    def comma_fix(data: str) -> float:
    try:
    return float(data)
    except ValueError:
    return float(data.replace(“,”, “”))
    清洗和求和
    使用 clean_sum 函数进行数据清洗和求和:
    from functools import reduce
    from typing import Iterable, Callable

def clean_sum(cleaner: Callable[[str], float], data: Iterable[str]) -> float:
return reduce(operator.add, map(cleaner, data))
示例
清洗数据并计算总和:
d = (‘1,196’, ‘1,176’, ‘1,269’, ‘1,240’, ‘1,307’, ‘1,435’, ‘1,601’, ‘1,654’, ‘1,803’, ‘1,734’)
clean_sum(comma_fix, d) # 输出:14415.0
3. 避免重复调用清洗函数
“应避免多次调用清洗函数,比如计算一组数据的平方和时,不应执行如下命令:”
示例不当代码:
comma_fix_squared = lambda x: comma_fix(x) ** 2
更好的做法是使用 lru_cache 装饰器或者将清洗结果存储为临时对象。
4. 使用 groupby() 函数和 reduce() 函数
数据分组
使用 defaultdict(list) 进行数据分组:
from collections import defaultdict
from typing import Iterable, Callable, Dict, List, TypeVar, Iterator, Tuple, cast

D_ = TypeVar(“D_”)
K_ = TypeVar(“K_”)

def partition(source: Iterable[D_], key: Callable[[D_], K_] = lambda x: cast(K_, x)) -> Iterable[Tuple[K_, Iterator[D_]]]:
pd: Dict[K_, List[D_]] = defaultdict(list)
for item in source:
pd[key(item)].append(item)
for k in sorted(pd):
yield k, iter(pd[k])
示例数据
原始数据示例:
data = [(‘4’, 6.1), (‘1’, 4.0), (‘2’, 8.3), (‘2’, 6.5), (‘1’, 4.6),
(‘2’, 6.8), (‘3’, 9.3), (‘2’, 7.8), (‘2’, 9.2), (‘4’, 5.6),
(‘3’, 10.5), (‘1’, 5.8), (‘4’, 3.8), (‘3’, 8.1), (‘3’, 8.0),
(‘1’, 6.9), (‘3’, 6.9), (‘4’, 6.2), (‘1’, 5.4), (‘4’, 5.8)]
分组统计
使用 summarize() 函数计算每组的均值和方差:
mean = lambda seq: sum(seq) / len(seq)
var = lambda mean, seq: sum((x - mean) ** 2 / mean for x in seq)

def summarize(key_iter: Tuple[K_, Iterable[Item]]) -> Tuple[K_, float, float]:
key, item_iter = key_iter
values = tuple(v for k, v in item_iter)
m = mean(values)
return key, m, var(m, values)
分组结果
计算各组的统计汇总值:
partition1 = partition(data, key=lambda x: x[0])
groups1 = map(summarize, partition1)
5. 小结
本章介绍了 functools 库中的重要函数,如 partial()、reduce() 和 lru_cache,它们在数据处理和函数优化中起到了重要作用。
通过使用高阶函数,可以构建复杂的功能并提高代码的可读性和效率。
🛠️ 装饰器设计技术
11.2 横切关注点
“装饰器背后的一条通用原则是可以从装饰器和使用该装饰器的原始函数中构建出复合函数。”
横切关注点的定义
横切关注点是指适用于多种函数的设计问题,通常希望通过装饰器实现的一次性设计。这些关注点包括:
日志记录
审计
安全
处理不完整数据
示例
日志记录装饰器:可将标准化消息写入应用程序的日志文件。
审计装饰器:可记录数据库更新的相关详细信息。
安全装饰器:可检查用户的权限。
处理空值的装饰器
在一些应用中,希望函数支持空值(如 None),并在遇到空值时返回 None 而不是引发异常。
11.3 复合设计
复合函数的数学表示法
复合函数可以表示为:
f(g(x))=(f∘g)(x)
Python 中的复合函数
可以通过装饰器实现复合函数,例如:
@f_deco
def g(x):
something
生成的函数等同于复合函数 f(g(x))。装饰器 f_deco() 必须融合 f() 的内部定义与给定的 g() 来定义并返回复合函数。
封装器结构
装饰器的封装器结构包括两部分:
参数部分:封装函数的参数。
返回部分:封装函数的返回结果。
复合函数的形式
装饰器可以创建更复杂的复合函数,如:
f_β (g(f_α (x)))
装饰器可以使用任意 Python 语句,增强函数的灵活性。
高阶函数的复合
创建 map(), filter(), 和 reduce() 等高阶函数的复合函数很容易,且通常能提高程序的可读性。例如:
map(f, map(g, x))
可以用复合函数 f_g 来简化描述:
f_g = lambda x: f(g(x))
11.4 向装饰器添加参数
参数化装饰器的定义
可以使用额外参数自定义装饰器,创建更复杂的处理逻辑:
@deco(arg)
def func(x):
something
示例代码
def func(x):
return something(x)
concrete_deco = deco(arg)
func = concrete_deco(func)
11.5 实现更复杂的装饰器
堆叠装饰器
可以通过堆叠多个装饰器来修改函数的结果,例如:
@f_wrap
@g_wrap
def h(x):
something
类型提示
在定义函数时,建议使用类型提示以保持代码清晰,例如:
from typing import Callable

m1: Callable[[float], float] = lambda x: x - 1
p2: Callable[[float], float] = lambda y: 2 ** y
mersenne: Callable[[float], float] = lambda x: m1(p2(x))
11.6 复杂设计注意事项
清洗和转换函数的设计
在数据清洗中,需要实现更复杂的逻辑,例如处理地理位置数据时可能遇到的格式转换问题。
复合函数的实现
在处理清洗和转换时,可以选择将清洗函数作为转换函数的参数,或反之。这两种设计模式各有优劣,但保留被装饰函数的类型提示是重要的。
示例代码
def cleanse_before(cleanse_function: Callable) -> Callable[[F], F]:
def abstract_decorator(converter: F) -> F:
@wraps(converter)
def cc_wrapper(text: str, *args, **kw) -> Any:
try:
return converter(text, *args, **kw)
except (ValueError, decimal.InvalidOperation):
cleaned = cleanse_function(text)
return converter(cleaned, *args, **kw)
return cast(F, cc_wrapper)
return abstract_decorator
灵活的清洗和转换函数示例
@cleanse_before(drop_punct)
def to_int(text: str, base: int = 10) -> int:
return int(text, base)
通过这些设计,可以创建既灵活又能处理不良数据的函数。
🛠️ 装饰器与并发编程
装饰器的定义与使用
装饰器简介
“装饰器是用于修改或增强函数功能的一种设计模式,通过在函数定义上使用@decorator语法来应用。”
示例:使用装饰器封装内置函数
定义 to_int 函数:
def to_int(x):
return int(x)
优化定义:使用 drop_punct() 函数封装内置的 int() 转换函数。
to_int2 = cleanse_before(drop_punct)(int)
使用示例
调用 to_int 函数:
>>> to_int(“1,701”)
1701

to_int(“97”)
97
装饰器的效果
装饰器可以重写和简化底层函数的类型提示。
装饰器的设计考量
简单固定内容:装饰器适用于在函数中添加简单且固定的内容。
复杂参数化装饰器:设计复杂的参数化装饰器时需谨慎,因为其可能导致设计复杂。
装饰器适用场景 描述
简单内容 适合用于添加基础设施或支撑内容
复杂设计 可能会导致设计复杂,不如复合函数清晰
高阶函数与并行性
高阶函数的替代方案
创建可调用的类定义以实现插件策略对象,可能比装饰器更简单。
使用偏函数在某些情况下可能比装饰器更直观。
横切关注点示例
日志记录与安全测试:这些特性是非特定于某个领域的后台处理。
小结
装饰器类型
不带参数的装饰器
参数化装饰器
使用注意
“使用 functools.wraps() 装饰器可以确保装饰器能正确复制被封装函数的属性。”
多进程与多线程编程
并发编程的基本概念
并发:指在不依赖于任务间的相互关系时进行高效处理。
多进程与多线程:两者的不同在于数据共享与资源管理。
进程与线程的比较
特点 进程 线程
资源共享 彼此独立,少交互 共享同一进程的资源
线程调度 操作系统负责 受GIL(全局解释器锁)影响
并发更新 更少的状态更新 可能会造成混乱,需要加锁
函数式编程与并发
函数式设计
函数式编程应尽量减少共享对象的更新,以避免并发问题。
高效的并发设计允许在多个处理器或核心上分配任务。
进程级并行性
利用进程级并行性可以完全忽略GIL的影响。
并发的意义与实现
计算与I/O的交错
在I/O密集型程序中,通过交错计算与I/O请求可以显著提升性能。
交错计算与I/O的策略
策略 描述
阶段化处理 每项数据经过多个处理阶段,交替执行计算与I/O
并发工作池 每个工作者负责处理一个数据项,生成结果
使用多进程池
multiprocessing 模块引入了 Pool 对象,允许多个进程并发执行。
应用程序需拆分为支持非严格执行的组件。
解析日志文件示例
读取与解析日志文件
使用 gzip 格式保存的日志文件,解析并提取数据。
示例代码:
from typing import Iterator
def local_gzip(pattern: str) -> Iterator[Iterator[str]]:
zip_logs = glob.glob(pattern)
for zip_file in zip_logs:
with gzip.open(zip_file, “rb”) as log:
yield (line.decode(‘us-ascii’).rstrip() for line in log)
正则表达式解析
使用正则表达式解析日志行,生成 NamedTuple 对象。
import re
format_pat = re.compile(r"(?P[\d.]+)\s+(?P\S+)\s+(?P\S+)\s+[(?P

class AccessDetails(NamedTuple):
access: Access
time: datetime.datetime
method: str
url: urllib.parse.ParseResult
protocol: str
referrer: urllib.parse.ParseResult
agent: Optional[AgentDetails]
属性说明:
access: 原始的 Access 对象。
time: 解析后的时间字符串。
method, url, protocol: 通过分解 access.request 字段得到。
referrer: 解析后的 URL。
agent: 可分解为更细粒度的字段。
AgentDetails 类
class AgentDetails(NamedTuple):
product: str
system: str
platform_details_extensions: str
解析器函数
解析请求
def parse_request(request: str) -> Tuple[str, str, str]:
words = request.split()
return words[0], ’ '.join(words[1:-1]), words[-1]
解析时间
def parse_time(ts: str) -> datetime.datetime:
return datetime.datetime.strptime(ts, “%d/%b/%Y:%H:%M:%S %z”)
解析用户代理
import re

agent_pat = re.compile(
r"(?P\S*?)\s+"
r"((?P.?))\s"
r"(?P<platform_details_extensions>.*)"
)

def parse_agent(user_agent: str) -> Optional[AgentDetails]:
agent_match = agent_pat.match(user_agent)
if agent_match:
return AgentDetails(**agent_match.groupdict())
return None
构建 AccessDetails 实例
from typing import Iterable, Iterator

def access_detail_iter(access_iter: Iterable[Access]) -> Iterator[AccessDetails]:
for access in access_iter:
try:
meth, url, protocol = parse_request(access.request)
yield AccessDetails(
access=access,
time=parse_time(access.time),
method=meth,
url=urllib.parse.urlparse(url),
protocol=protocol,
referrer=urllib.parse.urlparse(access.referer),
agent=parse_agent(access.user_agent)
)
except ValueError as e:
print(e, repr(access))
过滤访问细节
path_filter 函数
def path_filter(access_details_iter: Iterable[AccessDetails]) -> Iterable[AccessDetails]:
name_exclude = {
‘favicon.ico’, ‘robots.txt’, ‘index.php’, ‘humans.txt’,
‘dompdf.php’, ‘crossdomain.xml’,
‘_images’, ‘search.html’, ‘genindex.html’,
‘searchindex.js’, ‘modindex.html’, ‘py-modindex.html’,
}
ext_exclude = {
‘.png’, ‘.js’, ‘.css’,
}
for detail in access_details_iter:
path = detail.url.path.split(‘/’)
if not any(path):
continue
if any(p in name_exclude for p in path):
continue
final = path[-1]
if any(final.endswith(ext) for ext in ext_exclude):
continue
yield detail
分析访问细节
book_filter 函数
def book_filter(access_details_iter: Iterable[AccessDetails]) -> Iterator[AccessDetails]:
def book_in_path(detail: AccessDetails) -> bool:
path = tuple(
item
for item in detail.url.path.split(‘/’)
if item
)
return path[0] == ‘book’ and len(path) > 1
return filter(book_in_path, access_details_iter)
归约函数
from collections import Counter

def reduce_book_total(access_details_iter: Iterable[AccessDetails]) -> Dict[str, int]:
counts: Dict[str, int] = Counter()
for detail in access_details_iter:
counts[detail.url.path] += 1
return counts
完整分析过程
def analysis(filename: str) -> Dict[str, int]:
“”“Count book chapters in a given log”“”
details = path_filter(
access_detail_iter(
access_iter(
local_gzip(filename))))
books = book_filter(details)
totals = reduce_book_total(books)
return totals
多进程池进行并发处理
import multiprocessing

with multiprocessing.Pool(4) as workers:
workers.map(analysis, glob.glob(pattern))
过程说明:创建一个含4个独立进程的Pool对象,并将任务分配给进程池中的各个进程。
🐍 进程与多进程池
进程与子进程
“进程是计算机中独立执行的程序,而子进程是由父进程创建的一个或多个进程。”
一个进程通常由父进程和多个子进程组成。
父进程在子进程池启动后几乎无事可做,这通常是有效的。
通常将worker(工作进程)分配到单独的CPU(或计算核心),而父进程与Pool对象中的一个子进程共享一个CPU。
僵尸进程与上下文管理器
常规的Linux父/子进程规则适用于由该模块创建的子进程。
如果父进程在收集子进程状态之前崩溃,可能会留下运行中的僵尸进程。
进程的Pool对象作为上下文管理器使用,确保在上下文结束后子进程适时终止。
创建多进程池
默认情况下,使用multiprocessing.Pool()创建多个worker,数量由multiprocessing.cpu_count()决定。
在某些情况下,创建比CPU个数更多的worker是有益的,尤其是进行I/O密集型处理时。
特点 描述
CPU个数 multiprocessing.cpu_count()返回的值通常是最优的
I/O密集型处理 允许多个worker进程等待I/O完成,缩短应用程序运行时间
进程池中的映射方法
多进程的Pool对象有四个主要的map()方法:map()、imap()、imap_unordered()和starmap()。
这些函数负责在进程池中分配任务,分配任务和收集结果的方式不同。
map() 方法
map(function, iterable)方法将迭代项分配给每个worker,结果按分配顺序收集。
imap() 方法
imap(function, iterable)方法比map()懒惰,默认将每个迭代项发送给下一个可用的worker。
imap_unordered() 方法
imap_unordered(function, iterable)方法与imap()类似,但不会保留结果的顺序。
starmap() 方法
starmap(function, iterable)方法将每个迭代项作为元组传递给函数,采用修饰符。
实际示例
import multiprocessing
pattern = "
.gz"
combined = Counter()
with multiprocessing.Pool() as workers:
result_iter = workers.imap_unordered(analysis, glob.glob(pattern))
for result in result_iter:
combined.update(result)
该示例创建了一个Counter()函数合并进程池中每个worker的结果。
性能考量
使用多个并发进程可大幅缩减分析日志的用时。
单进程基准运行时间为150秒,而使用多个进程的运行时间为68秒。
其他方法
apply() 方法
apply(function, *args, **kw)方法向工作池传递值,实际上是map()方法的封装。
异步方法
map_async()、starmap_async()等函数负责将任务分配给Pool中的子进程。
这些函数返回一个可查询的对象,用于获取子进程的结果。
更复杂的多进程架构
multiprocessing包支持各种架构,包括跨多个服务器的多进程结构。
可以使用队列和管道将对象从一个进程传递到另一个进程,也可以在进程间共享内存。
设计并发处理
函数式编程方法
使用multiprocessing.Pool、concurrent.futures.ProcessPoolExecutor和concurrent.futures.ThreadPoolExecutor等方法。
方法 描述
multiprocessing.Pool 处理并发任务的常用方法
concurrent.futures.ProcessPoolExecutor 类似于multiprocessing.Pool,但API更简单
concurrent.futures.ThreadPoolExecutor 在单个进程中创建线程池,适合处理I/O密集型任务
性能比较
使用concurrent.futures的线程池与进程池之间存在显著的性能差异,尤其在I/O密集型任务中。
方式 运行时间
线程池 168秒
进程池 68秒
多线程可能更适合处理用户界面,而对于I/O密集型任务,进程池通常表现更佳。
总结
本章介绍了支持多个数据并发处理的两种主要方法:使用multiprocessing模块和concurrent.futures模块。针对函数式编程的应用,建议使用concurrent.futures模块,因为它更适合编写并发函数式应用程序。
🧮 条件表达式与运算符模块
正确性的重要性
“要设计的算法是正确的,这一点便无关紧要。”
字典重复键的处理
在字典中,最后一个值会替换任何先前的值。例如:

{‘a’: 1, ‘a’: 2}
{‘a’: 2}
在此情况下,无须在意需要保留哪一个重复键。
非严格最大值函数
定义
下面是 max() 函数的一个简化版本,它选取两个值中较大的那个:
def non_strict_max(a, b):
f = {a >= b: lambda: a,
b >= a: lambda: b}[True]
return f()
对于 a==b 的情况,字典中的两项都会得到条件为 True 的键,但其中只有一个会保留下来,这并不重要。
类型提示
该函数的正规类型提示非常复杂。进行比较的项必须是可排序的,即它们必须实现运算符排序。下面定义了一个适用于可排序对象的类型:
from abc import ABCMeta, abstractmethod
from typing import TypeVar, Any

class Rankable(metaclass=ABCMeta):
@abstractmethod
def lt(self, other: Any) -> bool: …
@abstractmethod
def gt(self, other: Any) -> bool: …
@abstractmethod
def le(self, other: Any) -> bool: …
@abstractmethod
def ge(self, other: Any) -> bool: …

RT = TypeVar(‘RT’, bound=Rankable)
类 Rankable 的定义是一个抽象类,使用 abc 模块将抽象特性具象化。
函数定义
non_strict_max() 函数的参数类型和返回值类型绑定为类型变量 RT:
def non_strict_max(a: RT, b: RT) -> RT:
这说明参数 a 和 b 应是可排序的类型,并被指定为 RT 类型。
过滤 True 条件表达式
使用 filter() 函数的变体
确定哪个表达式的结果为 True 有多种方法。以下是使用 filter() 函数编写的另一种变体:
from operator import itemgetter

def semifact(n: int) -> int:
alternatives = [
(n == 0, lambda n: 1),
(n == 1, lambda n: 1),
(n == 2, lambda n: 2),
(n > 2, lambda n: semifact(n-2)*n)
]
_, f = next(filter(itemgetter(0), alternatives))
return f(n)
这将所有备选方案定义为条件和函数对的一组序列。
类型提示
变量赋值语句中还可以包含类型提示:
alternatives: List[Tuple[bool, Callable[[int], int]]] = [

]
这里的定义阐明了元组列表包含一个布尔值和一个可调用函数。
匹配模式的寻找
使用正则表达式
上述创建多个条件集合的技术也可以配合正则表达式使用:
import re
p1 = re.compile(r"(some) pattern")
p2 = re.compile(r"a (different) pattern")

from typing import Optional, Match
def matcher(text: str) -> Optional[Match[str]]:
patterns = [p1, p2]
matching = (p.search(text) for p in patterns)
try:
good = next(filter(None, matching))
return good
except StopIteration:
pass
该函数会构建一系列可选模式,并应用于给定的文本块。
使用运算符模块代替匿名函数
使用 itemgetter() 函数
在使用 max()、min() 和 sorted() 函数时,可使用 operator 模块中的 itemgetter() 函数来代替匿名函数。示例:
from operator import itemgetter

year_cheese = [
(2000, 29.87), (2001, 30.12), (2002, 30.6),
(2003, 30.66), (2004, 31.33), (2005, 32.62),
(2006, 32.73), (2007, 33.5), (2008, 32.84),
(2009, 33.02), (2010, 32.92)
]

min_cheese = min(year_cheese, key=itemgetter(1))
max_cheese = max(year_cheese, key=itemgetter(1))
使用命名元组
可以使用 NamedTuple 来提供更清晰的数据结构:
from typing import NamedTuple

class YearCheese(NamedTuple):
year: int
cheese: float

year_cheese_2 = [YearCheese(*yc) for yc in year_cheese]
运算符的星号映射
使用 itertools.starmap()
函数 itertools.starmap() 是 map() 的变体,能够将函数应用于序列中的每一项。例如:
from itertools import starmap
from itertools import zip_longest

d = starmap(pow, zip_longest([], range(4), fillvalue=60))
计算分数序列的和
可以对一个潜在的无穷序列进行求和:
from itertools import count, takewhile

num = map(fact, count())
den = map(semifact, (2*n+1 for n in count()))
terms = takewhile(lambda t: t > 1E-10, map(truediv, num, den))
result = 2 * sum(terms)
使用 functools.reduce() 函数进行归约
定义求和和乘积函数
可以使用 functools.partial() 来创建部分求值的 reduce() 函数版本:
import functools
import operator

sum = functools.partial(functools.reduce, operator.add)
prod = functools.partial(functools.reduce, operator.mul)
定义阶乘函数
使用 prod() 函数可以定义阶乘:
fact = lambda n: 1 if n < 2 else n * prod(range(1, n))
小结
本章介绍了条件表达式的不同实现版本,以及如何使用运算符模块中的高阶函数来简化匿名函数的使用。
📦 PyMonad 库
14.1 下载和安装
根据 Python 包索引(PyPI)可以找到 PyMonad 包。使用 pip 将 PyMonad 加入运行环境。
访问 PyMonad 以获取更多信息。
对于 macOS 和 Linux 开发人员,命令 pip install pymonad 可能需要以 sudo 命令为前缀:
如果安装的是个人版的 Python,无需使用 sudo。
如果安装的是系统级的 Python,则需要使用 sudo。在执行 sudo pip install pymonad 这样的命令时,系统会提示输入密码,以确保拥有执行安装所需的管理权限。
对于 Windows 开发人员,不存在 sudo 命令,但也必须拥有足够的管理权限。
安装完 PyMonad 包后,可以使用以下命令进行确认:
import pymonad
help(pymonad)
该命令会显示 docstring 模块,确认已正确安装 PyMonad 包。
注意:整个项目的名称 PyMonad 使用了大小写混合的形式。在导入已安装的 Python 包名 “pymonad” 时,使用的是小写。
14.2 函数式复合和柯里化
柯里化 是将多参数函数转化为单参数函数集合的过程,以逻辑学家 Haskell Curry 的名字命名。
柯里化示例:将函数 f(x,y)→z 转化为两个函数 fc1(x)→fc2(y) 和 fc2(y)→z。
在 Python 中可以求解一个柯里化函数:f_c(2)(3)。
示例
from pymonad import curry

@curry
def systolic_bp(bmi, age, gender_male, treatment):
return (68.15 + 0.58 * bmi + 0.65 * age + 0.94 * gender_male + 6.44 * treatment)
该函数基于多元回归模型,预测血压。相关示例:

systolic_bp(25, 50, 1, 0) # 116.09
systolic_bp(25, 50, 0, 1) # 121.59
可以创建柯里化函数以部分求值:
treated = systolic_bp(25, 50, 0)
treated(0) # 115.15
treated(1) # 121.59
高阶函数的柯里化
functools.reduce() 函数是可柯里化的,示例:
from collections.abc import Sequence
from pymonad import curry

@curry
def myreduce(function, iterable_or_sequence):
if isinstance(iterable_or_sequence, Sequence):
iterator = iter(iterable_or_sequence)
else:
iterator = iterable_or_sequence
s = next(iterator)
for v in iterator:
s = function(s, v)
return s
示例:
from operator import add
sum = myreduce(add)
sum([1, 2, 3]) # 6
14.3 函数式复合和 PyMonad * 运算符
柯里化函数的一个重要价值是可以通过函数式复合来组合函数。
示例
import operator
prod = myreduce(operator.mul)

@curry
def alt_range(n):
if n == 0:
return range(1, 2) # 仅值 [1]
elif n % 2 == 0:
return range(2, n + 1, 2)
else:
return range(1, n + 1, 2)

semi_fact = prod * alt_range
semi_fact(9) # 945
14.4 函子和应用型函子
函子 是简单数据的函数式表示。示例:
pi = lambda: 3.14
pi() # 3.14
对函子应用柯里化函数会创建一个新的柯里化函子。
Maybe 函子
Maybe 函子有两个子类:
Nothing
Just(某个简单值)
使用示例:

x1 = systolic_bp * Just(25) & Just(50) & Just(1) & Just(0)
x1.getValue() # 116.09
x2 = systolic_bp * Just(25) & Just(50) & Just(1) & Nothing
x2.getValue() is None # True
List 函子
List 函子是“懒惰”的,它不会对可迭代对象进行求值,示例:
List(range(10)) # [range(0, 10)]
通过使用 * 运算符,可以扩展生成器或范围对象的值。
fact = prod * range1n
seq1 = List(*range(20))
f1 = fact * seq1
f1[:10] # [1, 1, 2, 6, 24, 120, 720, 5040, 40320, 362880]
总结
以上内容涵盖了 PyMonad 库的安装、柯里化、函数式复合以及函子的基本概念和应用示例。
🐍 PyMonad 库与函数式编程
14.5 单子的 bind() 函数和 >> 运算符
“单子是函数式编程中的一个重要概念,它强制按顺序求值,从而使表达式的计算过程更加清晰。”
单子的定义
单子(Monad)是一个函数式编程的抽象概念,用于处理按顺序执行的计算。
在 Python 中,虽然不需要单子来强制求值顺序,但我们可以利用这一概念来清晰化复杂算法。
bind() 函数与 >> 运算符
bind() 函数用于连接多个操作,确保它们按顺序执行。
运算符是 bind() 的一种简便表示法。
示例表达式
使用 >> 运算符构造的表达式:
Just(some file)>>read header>>read next>>read next
等价于:
bind(bind(bind(Just(some file),read header),read next),read next)
14.6 模拟实现单子
单子的传递与返回
单子通过管道的方式传递,可以作为参数传递给函数,并返回相似的数据结构。
下面是一个模拟掷骰子游戏的示例。
掷骰子游戏规则
游戏分为两个阶段:
出骰子阶段:
如果点数是 7 或 11,投手赢得游戏。
如果点数是 2、3 或 12,投手输掉游戏。
其他点数(4、5、6、8、9 或 10)建立一个点,转入点骰子阶段。
点骰子阶段:
如果点数是 7,投手输掉游戏。
如果点数等于建立的点,投手赢得游戏。
其他点数继续进行。
随机数生成器
import random
def rng():
return (random.randint(1, 6), random.randint(1, 6))
上述函数生成一组骰子点数。
游戏函数实现
def craps():
outcome = (
Just((“”, 0, [])) >> come_out_roll(dice)
point_roll(dice)
)
print(outcome.getValue())
创建初始单子 Just((“”, 0, [])),并根据游戏结果生成结果单子 outcome。
出骰子与点骰子函数
@curry
def come_out_roll(dice, status):
d = dice()
if sum(d) in (7, 11):
return Just((“win”, sum(d), [d]))
elif sum(d) in (2, 3, 12):
return Just((“lose”, sum(d), [d]))
else:
return Just((“point”, sum(d), [d]))

@curry
def point_roll(dice, status):
prev, point, so_far = status
if prev != “point”:
return Just(status)
d = dice()
if sum(d) == 7:
return Just((“craps”, point, so_far+[d]))
elif sum(d) == point:
return Just((“win”, point, so_far+[d]))
else:
return (
Just((“point”, point, so_far+[d]))
>> point_roll(dice)
)
14.7 单子的其他特性
幺半群(Monoid)
“幺半群是一种数学结构,包括运算符和标识元素,数据元素在运算符下是封闭的。”
PyMonad 包含许多预定义的幺半群类,可以扩展以创建自定义类。
幺半群可以用于创建复杂值的数据结构,记录历史操作。
14.8 小结
本章介绍了如何在 Python 中使用 PyMonad 库来展现函数式编程的概念,包括柯里化、单子的实现及其在模拟中的应用。
通过函数式设计,Python 可以实现有用的程序,而不需引入状态化对象或严格有序求值。
🌐 Web 服务的函数式设计方法
函数嵌套与请求处理
“函数式设计允许通过嵌套函数的方式来处理Web请求,每个函数可以扩充请求或调整响应。”
函数嵌套示例
在处理用户身份认证的过程中,函数的嵌套结构如下所示:
response = content(
authentication(
csrf(session(headers, request, forms))
)
)
每个函数的输出可以作为下一个函数的输入。
例如,session() 函数使用请求的报头来判断会话是否存在。
csrf() 函数负责验证CSRF令牌,确保请求的有效性。
authentication() 函数会根据用户凭证返回相应的结果。
理想的函数结构
def session(headers, request, forms):
# 预处理: 确定会话
content = csrf(headers, request, forms)
# 后处理内容
return content

def csrf(headers, request, forms):
# 预处理: 验证CSRF令牌
content = authenticate(headers, request, forms)
# 后处理内容
return content
该结构展示了如何通过支持扩充输入或输出的嵌套函数集合来创建Web内容。
WSGI 标准
WSGI 概述
“WSGI(Web 服务网关接口)是一种标准化的设计模式,旨在简化Web请求的响应创建。”
WSGI 是大多数基于Python的Web服务器的通用框架。
参考实现可以在Python库中的 wsgiref 包中找到。
WSGI 应用程序接口
每个WSGI应用程序的接口如下:
def some_app(environ, start_response):
return content
environ 参数是一个字典,包含请求的所有参数,包括报头、请求方法、路径等。
start_response 参数是一个用于发送状态和响应报头的函数。
示例:静态应用程序
def static_app(environ: Dict, start_response: SR_Func) -> Union[Iterator[bytes], List[bytes]]:
log = environ[‘wsgi.errors’]
try:
# 处理静态文件
except IsADirectoryError:
# 处理目录请求
except FileNotFoundError:
# 返回404错误
该应用程序尝试打开请求路径的文件,并处理可能出现的异常。
异常处理与响应
处理请求中的异常
WSGI应用程序的每个阶段负责过滤请求,可以通过异常处理提前拒绝错误请求。
示例:静态应用程序异常处理
def static_app(environ: Dict, start_response: SR_Func) -> Union[Iterator[bytes], List[bytes]]:
try:
# 尝试读取文件
except IsADirectoryError:
return index_app(environ, start_response)
except FileNotFoundError:
start_response(‘404 NOT FOUND’, [])
return [f"Not Found {static_path}\n{e!r}“.encode(“utf-8”)]
通过捕获异常来处理不同类型的请求错误。
创建WSGI 应用程序
示例:基于REST的Web服务
该Web服务可处理数据请求,并返回不同格式的文件(如JSON、XML或CSV)。
def anscombe_app(environ: Dict, start_response: SR_Func) -> Iterable[bytes]:
try:
match = path_pat.match(environ[‘PATH_INFO’])
dataset = anscombe_filter(set_id, raw_data())
content_bytes, mime = serialize(query[‘form’][0], set_id, dataset)
headers = [(‘Content-Type’, mime), (‘Content-Length’, str(len(content_bytes)))]
start_response(“200 OK”, headers)
return [content_bytes]
except Exception as e:
# 处理异常
raw_data() 函数读取原始数据,anscombe_filter() 函数过滤数据,serialize() 函数将结果序列化为字节码。
URL 路径与查询字符串
使用URL路径来定义请求的资源:
path_pat = re.compile(r”^/anscombe/(?P.*?)/?$")
可以提取数据集的信息和输出格式。
总结
通过以上内容,可以看出函数式编程在Web服务设计中的重要性。使用WSGI标准和函数嵌套的方式,可以灵活地处理请求、验证用户身份、返回响应,从而构建出高效的Web应用程序。
📊 序列化与Web服务
序列化的数据创建字节对象
“序列化的数据创建字节对象,以便统计这些字节。”
如果选择忽略 Content-Length 报头,可以大规模地改变应用程序的结构。
可以将每个序列化器更改为生成器函数,从而在创建时生成字节对象。
对于大型数据集,优化可能有益,但对于关注下载进度的用户则可能不太友好,因为浏览器无法显示下载的完成情况。
事务分解与错误处理
事务分解
一种常见的优化是将事务分解为两部分:
第一步用于计算结果并将文件放入 Downloads 目录。
响应是一个带有 Location 报头的 302 FOUND,用于标识需下载的文件。
错误处理
将所有错误视为 404 NOT FOUND 错误会引起误导,因为可能是其他环节出了问题。
更复杂的错误处理会引入更多的 try / except 块,以提供更多信息反馈。
调试信息
出于调试目的,在生成的Web页面中提供了Python的栈跟踪。
在没有调试需求的环境中,这是一种糟糕的做法,来自API的反馈应只用于处理请求。
获取原始数据
函数定义
from Chapter_3.ch03_ex5 import (series, head_map_filter, row_iter)
from typing import (NamedTuple, Callable, List, Tuple, Iterable, Dict, Any)

RawPairIter = Iterable[Tuple[float, float]]

class Pair(NamedTuple):
x: float
y: float

pairs: Callable[[RawPairIter], List[Pair]] = lambda source: list(Pair(*row) for row in source)

def raw_data() -> Dict[str, List[Pair]]:
with open(“Anscombe.txt”) as source:
data = tuple(head_map_filter(row_iter(source)))
mapping = {
id_str: pairs(series(id_num, data))
for id_num, id_str in enumerate([‘I’, ‘II’, ‘III’, ‘IV’])
}
return mapping
函数 raw_data() 打开本地数据文件,并利用 row_iter() 函数将解析后的文件的每一行返回。
使用 head_map_filter() 函数删除文件的头部,结果创建了一个列表元组结构,并将其赋给 data 变量。
生成的结构是 NamedTuple 的子类 Pair 的实例,字段类型为 float。
选择特定序列
通过名称选取某个特定的序列,例如:

raw_data()[‘I’]
[Pair(x=10.0, y=8.04), Pair(x=8.0, y=6.95), …]
运用过滤器
过滤器函数
def anscombe_filter(set_id: str, raw_data_map: Dict[str, List[Pair]]) -> List[Pair]:
return raw_data_map[set_id]
将简单表达式转化为函数的原因:
函数表示法形式上更一致,更灵活。
易于扩展过滤器的功能。
可以在函数文档字符串中包含独立的单元测试。
错误处理与异常
该函数中未做任何错误处理,关注的是所谓的幸福之路(happy path)。
WSGI的封装函数应捕获所有异常并返回相应的状态信息和错误响应内容。
序列化结果
序列化概念
序列化是将Python数据转换为适合传输的字节流。
每种格式最好用一个简单的函数来描述。
序列化器字典
Serializer = Callable[[str, List[Pair]], bytes]
SERIALIZERS: Dict[str, Tuple[str, Serializer]] = {
‘xml’: (‘application/xml’, serialize_xml),
‘html’: (‘text/html’, serialize_html),
‘json’: (‘application/json’, serialize_json),
‘csv’: (‘text/csv’, serialize_csv),
}
序列化函数
def serialize(format: str, title: str, data: List[Pair]) -> Tuple[bytes, str]:
mime, function = SERIALIZERS.get(format.lower(), (‘text/html’, serialize_html))
return function(title, data), mime
函数 serialize() 负责在 SERIALIZERS 字典中定位特定的序列化器。
JSON与CSV格式的序列化
JSON序列化器
import json

@to_bytes
def serialize_json(series: str, data: List[Pair]) -> str:
obj = [dict(x=r.x, y=r.y) for r in data]
text = json.dumps(obj, sort_keys=True)
return text
CSV序列化器
import csv
import io

@to_bytes
def serialize_csv(series: str, data: List[Pair]) -> str:
buffer = io.StringIO()
wtr = csv.DictWriter(buffer, Pair._fields)
wtr.writeheader()
wtr.writerows(r._asdict() for r in data)
return buffer.getvalue()
XML格式的序列化
XML序列化器
import xml.etree.ElementTree as XML

def serialize_xml(series: str, data: List[Pair]) -> bytes:
doc = XML.Element(“series”, name=series)
for row in data:
row_xml = XML.SubElement(doc, “row”)
x = XML.SubElement(row_xml, “x”)
x.text = str(row.x)
y = XML.SubElement(row_xml, “y”)
y.text = str(row.y)
return cast(bytes, XML.tostring(doc, encoding=‘utf-8’))
HTML格式的序列化
HTML序列化器
import string

data_page = string.Template(“”"\

Series ${title}

Series ${title}

${rows}
x y
""")

@to_bytes
def serialize_html(series: str, data: List[Pair]) -> str:
text = data_page.substitute(
title=series,
rows=“\n”.join(
“{0.x}{0.y}”.format(row) for row in data)
)
return text
API 密钥的使用
API密钥用于验证访问、授权特定功能及跟踪使用情况。
创建API密钥的一种简单方法是使用加密随机数生成难以预测的密钥字符串。
示例代码
import secrets

secrets.token_urlsafe(18*size)
‘kzac-xQ-BB9Wx0aQoXRCYQxr’
使用URL安全的Base64编码,确保生成字符串中不会包含特殊字符。
小结
本章讨论了如何将函数式设计应用于基于REST的Web服务,介绍了WSGI标准及序列化方法。
🖥️ WSGI 应用程序与数据处理
函数封装与数据处理
“我们将这些函数封装在一个简单的、兼容WSGI(Web Server Gateway Interface)的应用程序中,以便将Web服务与提取和过滤数据的实际处理操作分离开来。”
WSGI 应用程序
WSGI 是一种用于Python Web应用程序与Web服务器之间的接口标准。
封装的函数包括:
raw_data()
anscombe_filter()
serialize()
输入有效性处理
Web服务函数假定所有输入都是有效的。
如果输入无效,普通的Python异常处理将会抛出异常。
WSGI封装函数会捕获错误并返回相应的状态码和错误内容。
数据上传与表单处理
“本章没有讨论与上传数据或接收表单数据来更新持久化数据存储等相关的复杂问题。”
上传数据和接收表单数据并不比获取数据和序列化结果复杂,但可以通过更巧妙的方式来解决。
对于简答的查询和数据共享,可以使用小型Web服务应用程序。
函数式设计模式
使用函数式设计模式可以确保网站代码简洁明了。
对于更复杂的Web应用程序,考虑使用能正确处理细节的框架。


🚀 优化与改进
记忆化与缓存
“许多算法可以受益于记忆化。”
记忆化的定义
记忆化是缓存函数结果以避免重复计算的技术。
常见的应用包括阶乘计算和斐波那契数列。
记忆化的示例
阶乘计算:
F(n)=n×F(n-1)
斐波那契数的递归版本包含两个尾递归调用,定义如下:
F(n)={■(0&“if " n=0@1&“if " n=1@F(n-1)+F(n-2)&“otherwise” )┤
Syracuse 函数
Syracuse 函数 S(n) 定义如下:
S(n)={■(n/2&“if " n” is even” @3n+1&“if " n” is odd” )┤
优化技术
使用 @lru_cache 装饰器
通过使用 @lru_cache 装饰器,可以快速实现记忆化。
二项式系数的计算
二项式 C(n,m) 的计算公式:
C(n,m)=n!/(m!(n-m)!)
处理浮点数与分数
对于计数的原始数据,使用 fractions 模块而非浮点数,可以获得更精确的结果。


📊 案例研究:卡方决策
卡方检验
“为了做出这个决策,需要计算一个预期分布,并将观察到的数据与预期进行比较。”
数据收集与分析
在生产质量保障过程中,将硅片缺陷的数据收集到数据库中。
使用 SQL 查询提取缺陷详情。
示例 SQL 查询
SELECT SHIFT, DEFECT_CODE, SERIAL_NUMBER FROM some tables;
使用 Counter 对象
利用 collections.Counter 进行缺陷计数。
函数实现
缺陷减少函数
from typing import TextIO
import csv
from collections import Counter
from types import SimpleNamespace

def defect_reduce(input_file: TextIO) -> Counter:
rdr = csv.DictReader(input_file)
assert set(rdr.fieldnames) == set([“defect_type”, “serial_number”, “shift”])
rows_ns = (SimpleNamespace(**row) for row in rdr)
defects = ((row.shift, row.defect_type) for row in rows_ns if row.defect_type)
tally = Counter(defects)
return tally
计算预期缺陷
汇总数据并计算每种缺陷的预期值,以便进行卡方检验。


以上内容涵盖了WSGI应用程序的数据处理、记忆化与缓存的优化技术,以及卡方检验的案例研究,确保学生能充分理解并应用这些概念。
📊 数据处理与统计分析
数据读取与转换
“为了对数据进行有效处理,首先需要读取并解析数据源。”
CSV 数据读取
在数据处理的过程中,我们使用了 csv.DictReader() 函数来解析原始的 CSV 数据。该函数的使用包括以下几个关键步骤:
打开文件:使用 csv.DictReader() 需要一个打开的文件作为输入。
确保数据完整性:通过 assert 语句确保文件包含预期的数据字段,例如:
assert set(rdr.fieldnames) == set([“defect_type”, “serial_number”, “shift”])
数据转换
数据行会被转换为匿名对象,并构建二元组,形式如下: - 键:由轮换(shift)和缺陷代码(defect_code)构成。 - 值:经过整型转换的计数。
生成的结果序列形式为:((shift,defect),count)。
Counter 对象的使用
创建与求和
创建 Counter 对象以便于统计和组合数据:
使用 Counter(dict(convert)) 创建计数器对象。
统计缺陷总数:
total=“sum(defects.values())”
计算样本集中缺陷的总数,例如 309 个缺陷。
按轮换和类型汇总
可以通过以下方式提取和汇总数据:
按轮换汇总:
shift_totals = sum(
(Counter({s: defects[s, d]}) for s, d in defects),
Counter() # 初始值为空的Counter对象
)
按类型汇总:
type_totals = sum(
(Counter({d: defects[s, d]}) for s, d in defects),
Counter() # 初始值为空的Counter对象
)
示例数据
按轮换的结果示例:
Counter({‘3’: 119, ‘2’: 96, ‘1’: 94})
按缺陷类型的结果示例:
Counter({‘C’: 128, ‘A’: 74, ‘B’: 69, ‘D’: 38})
概率计算
缺陷概率
为了计算缺陷的概率,使用 Fraction 对象保持精度:
轮换的概率:
P_shift = {shift: Fraction(shift_totals[shift], total) for shift in sorted(shift_totals)}
缺陷类型的概率:
P_type = {type: Fraction(type_totals[type], total) for type in sorted(type_totals)}
示例概率值
轮换概率示例:
{‘1’: Fraction(94, 309), ‘2’: Fraction(32, 103), ‘3’: Fraction(119, 309)}
类型概率示例:
{‘A’: Fraction(74, 309), ‘B’: Fraction(23, 103), ‘C’: Fraction(128, 309), ‘D’: Fraction(38, 309)}
计算期望值与列联表
期望值计算
期望值是轮换与缺陷类型概率的乘积,计算代码如下:
expected = {
(s, t): P_shift[s] * P_type[t] * total
for t in P_type
for s in P_shift
}
列联表的生成
生成列联表的代码负责输出观测值和预期值的对比:
print("obs exp "len(type_totals))
for s in sorted(shift_totals):
pairs = [
f"{defects[s,t]:3d} {float(expected[s,t]):5.2f}"
for t in sorted(type_totals)
]
print(f"{’ '.join(pairs)} {shift_totals[s]:3d}")
最后一行输出缺陷类型总数和总计数量。
卡方检验
卡方值的计算
卡方值的计算公式为:
χ2=∑_s▒∑_t^▒((o_st-e_st )^2)/e_st
其中 o 为观测值,e 为期望值。
diff = lambda e, o: (e - o)**2 / e
chi2 = sum(
diff(expected[s, t], defects[s, t])
for s in shift_totals
for t in type_totals
)
自由度与阈值
自由度 df=(r-1)(c-1),在本例中为 6。常用的阈值为 12.5916,低于此值的概率认为是随机的。
结论
通过计算卡方值并与阈值比较,可以判断数据是否存在显著性差异,进而决定是否需要进一步研究。
📊 计算期望值并显示列联表
Gamma_Half 函数
函数定义
def Gamma_Half(k: Union[int, Fraction]) -> Union[int, Fraction]:
参数:k 可以是整数或分数(Fraction)。
返回值:返回对应的伽马函数值。
函数逻辑
整数输入:
如果 k 是整数,使用 fact(k-1) 计算。
分数输入:
如果 k 是分数且分母为1,使用 fact(k-1)。
如果分母为2,需要使用更复杂的闭合形式:
(fact(2n))/(4^n⋅fact(n))⋅√π
代码示例:
if isinstance(k, int):
return fact(k-1)
elif isinstance(k, Fraction):
if k.denominator == 1:
return fact(k-1)
elif k.denominator == 2:
n = k - Fraction(1, 2)
return fact(2
n)/(Fraction(4**n)*fact(n))*sqrt_pi
测试用例
示例:
Γ(2)=1
Γ(5)=24
对于分数,使用 Fraction 进行计算:
g = Gamma_Half(Fraction(3, 2))
g.limit_denominator(2_000_000)
返回值为 Fraction(291270, 328663)。
计算随机分布的概率
cdf 函数
def cdf(x: Union[Fraction, float], k: int) -> Fraction:
参数:
x:卡方值,计算公式为 ∑(obs[i]-exp[i])^2/exp[i]。
k:自由度,通常为 len(data) - 1。
函数逻辑
CDF 计算:
“CDF”(x)=1-γ(k/2,x/2)/(Γ_H alf(k/2) )
使用示例
调用示例:
round(float(cdf(0.004, 1)), 2) # 0.95
cdf(0.004, 1).limit_denominator(100) # Fraction(94, 99)
round(float(cdf(10.83, 1)), 3) # 0.001
cdf(10.83, 1).limit_denominator(1000) # Fraction(1, 1000)
示例数据
χ² 表示例: | χ² 值 | 自由度 | 计算结果 | 限制分母后的结果 | |———|——–|——————-|———————| | 0.004 | 1 | 0.95 | Fraction(94, 99) | | 10.83 | 1 | 0.001 | Fraction(1, 1000) | | 3.94 | 10 | 0.95 | Fraction(19, 20) | | 29.59 | 10 | 0.001 | Fraction(8, 8005) |
函数式编程设计模式
常见设计模式
柯里化:使用 functools.partial() 创建新函数。
闭包:返回一个包含外部函数变量的函数。
纯函数:无状态函数,强调避免使用全局变量。
高阶函数:使用其他函数的内置函数,如 map()、filter() 等。
惰性求值:如生成器表达式,延迟计算直到需要值时。
单子:用于明确排序的语法,适用于复杂表达式。
其他技术
尾递归转换:将递归转换为 for 循环以提高性能。
可迭代函数:使用 yield from 创建可迭代集合。
小结
本章讨论了优化技术,包括选择合适的算法和数据结构、使用记忆化缓存结果,以及使用可迭代对象替代大型实例化数据对象。
介绍了不完全伽马函数和完全伽马函数的计算方法,以及如何通过函数式编程实现简洁高效的代码。

Logo

GitCode 天启AI是一款由 GitCode 团队打造的智能助手,基于先进的LLM(大语言模型)与多智能体 Agent 技术构建,致力于为用户提供高效、智能、多模态的创作与开发支持。它不仅支持自然语言对话,还具备处理文件、生成 PPT、撰写分析报告、开发 Web 应用等多项能力,真正做到“一句话,让 Al帮你完成复杂任务”。

更多推荐