Skip to content

myxiaoao/postgre-logical-replication

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

8 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

PostgreSQL 逻辑复制解析器

这个 PHP 库提供了一个简单易用的方式来监听和处理 PostgreSQL 数据库的变更,使用逻辑复制(Logical Replication)功能。它使用 wal2json 插件解析 PostgreSQL 的逻辑复制输出,并将其转换为易于使用的 PHP 数组,方便开发者实现各种 CDC(变更数据捕获)应用场景。

功能特点

  • 自动设置 PostgreSQL 逻辑复制槽和发布
  • 使用 wal2json 插件解析 PostgreSQL 的逻辑复制输出为 JSON 格式
  • 支持插入、更新、删除等操作的解析
  • 自动重连和错误处理
  • 可定制的日志记录
  • 信号处理(优雅关闭)

系统要求

  • PHP 8.0+
  • PostgreSQL 10+(启用了逻辑复制功能和安装了 wal2json 插件)
  • PHP 扩展:
    • pdo
    • pdo_pgsql
    • pgsql
    • pcntl(可选,用于信号处理)
  • Monolog 2.0+(用于日志记录)

安装

通过 Composer 安装:

composer require cooper/postgre-cdc

配置 PostgreSQL

确保 PostgreSQL 已启用逻辑复制功能,并安装了 wal2json 插件。在 postgresql.conf 中设置:

wal_level = logical         # 启用逻辑复制
max_replication_slots = 5   # 复制槽数量
max_wal_senders = 5         # 并发复制连接数
shared_preload_libraries = 'wal2json'  # 加载插件

然后重启 PostgreSQL 服务器。

基本用法

<?php

require_once 'vendor/autoload.php';

use Cooper\PostgreCDC\PostgreLogicalReplication;

// 数据库配置
$dbConfig = [
    'host' => 'localhost',
    'port' => '5432',
    'dbname' => 'your_database',
    'user' => 'your_username',
    'password' => 'your_password',
    'replication_slot_name' => 'my_replication_slot',
    'publication_name' => 'my_publication'
];

// 创建复制实例
$replication = new PostgreLogicalReplication($dbConfig);

// 连接数据库
if (!$replication->connect()) {
    die("无法连接到 PostgreSQL 数据库\n");
}

// 设置复制
if (!$replication->setupReplication()) {
    die("无法设置复制环境\n");
}

// 定义变更处理回调函数
$handleChange = function($data, $rawJsonData = null) {
    // 根据变更类型处理数据
    if (isset($data['change'])) {
        foreach ($data['change'] as $change) {
            $kind = $change['kind'] ?? '';
            $table = $change['table'] ?? '';
            
            switch ($kind) {
                case 'insert':
                    echo "插入操作: 表 {$table}\n";
                    if (isset($change['columnvalues'])) {
                        print_r($change['columnvalues']);
                    }
                    break;
                    
                case 'update':
                    echo "更新操作: 表 {$table}\n";
                    if (isset($change['columnvalues'])) {
                        echo "新值:\n";
                        print_r($change['columnvalues']);
                    }
                    if (isset($change['oldkeys'])) {
                        echo "旧键值:\n";
                        print_r($change['oldkeys']);
                    }
                    break;
                    
                case 'delete':
                    echo "删除操作: 表 {$table}\n";
                    if (isset($change['oldkeys'])) {
                        print_r($change['oldkeys']);
                    }
                    break;
            }
        }
    }
};

// 开始监听变更
try {
    $replication->startReplication($handleChange);
} catch (Exception $e) {
    echo "错误: " . $e->getMessage() . "\n";
} finally {
    $replication->close();
}

配置选项

数据库配置

参数 描述 默认值
host PostgreSQL 服务器主机 -
port PostgreSQL 服务器端口 -
dbname 数据库名称 -
user 用户名 -
password 密码 -
replication_slot_name 复制槽名称 php_logical_slot
publication_name 发布名称 php_publication
application_name 应用名称 php_logical_replication

方法

方法 描述
connect() 连接到 PostgreSQL 数据库
setupReplication() 设置复制环境(创建复制槽和发布)
startReplication(callable $callback) 开始监听变更数据
close() 关闭连接
setHeartbeatInterval(int $seconds) 设置心跳间隔(秒)
setMaxReconnectAttempts(int $attempts) 设置最大重连次数
setReconnectDelay(int $seconds) 设置重连延迟(秒)
recreateReplicationSlot() 重新创建复制槽

wal2json 输出格式

wal2json 插件输出的 JSON 数据格式如下:

插入操作

{
  "change": [
    {
      "kind": "insert",
      "schema": "public",
      "table": "users",
      "columnnames": ["id", "name", "email"],
      "columntypes": ["integer", "character varying(255)", "character varying(255)"],
      "columnvalues": [1, "张三", "zhangsan@example.com"]
    }
  ]
}

更新操作

{
  "change": [
    {
      "kind": "update",
      "schema": "public",
      "table": "users",
      "columnnames": ["id", "name", "email"],
      "columntypes": ["integer", "character varying(255)", "character varying(255)"],
      "columnvalues": [1, "张三", "zhangsan_new@example.com"],
      "oldkeys": {
        "keynames": ["id"],
        "keytypes": ["integer"],
        "keyvalues": [1]
      }
    }
  ]
}

删除操作

{
  "change": [
    {
      "kind": "delete",
      "schema": "public",
      "table": "users",
      "oldkeys": {
        "keynames": ["id"],
        "keytypes": ["integer"],
        "keyvalues": [1]
      }
    }
  ]
}

关于 wal2json 插件

wal2json 是 PostgreSQL 的一个逻辑解码输出插件,它将 WAL(预写式日志)中的变更转换为 JSON 格式。这使得处理和消费这些变更变得更加容易,特别是对于需要与其他系统集成的应用程序。

wal2json 支持以下功能:

  • 将 INSERT、UPDATE、DELETE 操作转换为 JSON 格式
  • 支持事务边界(开始和提交)
  • 提供列名、类型和值
  • 提供主键信息
  • 支持多种配置选项,如时间戳、模式名称等

更多关于 wal2json 的信息,请访问 wal2json GitHub 仓库

许可证

The MIT License (MIT). Please see License File for more information.

About

PostgreSQL Change Data Capture using logical replication

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published

Languages