/ 数据复制服务 drs/ 实时同步/ / 通过创建事件触发器和函数实现postgresql增量ddl同步
更新时间:2024-11-25 gmt 08:00

通过创建事件触发器和函数实现postgresql增量ddl同步-九游平台

本小结介绍postgresql->rds for postgresql实时同步,通过在源库创建事件触发器和函数获取源库的ddl信息,然后在drs增量实时同步阶段实现ddl操作的同步。

前提条件

  • 当前支持的ddl操作包含如下:
    • 表级同步支持:truncate(仅postgresql 11及以上版本支持)、drop table 、alter table(包含add column、drop column、alter column、rename column、add constraint、drop constraint、rename)、comment on column、comment on table。
    • 库级同步支持:truncate(仅postgresql 11及以上版本支持)、create schema/table、drop table 、alter table(包含add column、drop column、alter column、rename column、add constraint、drop constraint、rename)、create sequence、drop sequence、alter sequence、create index、alter index、drop index、create view、alter view、comment on column、comment on table、comment on schema、comment on sequence、comment on index、comment on view。
    • 表级同步:rename表名之后,向更改名称后的表插入新的数据时,drs不会同步新的数据到目标库。
    • 库级同步:源库使用非create table方式创建的表不会同步到目标库。常见地如:使用create table as创建表、调用函数创建表。
    • 暂不支持以注释开头的ddl语句的同步,以注释开头的ddl语句将被忽略。
    • 不支持函数和存储过程中ddl语句的同步,函数和存储过程中执行的ddl语句将被忽略。
  • 源库和目标库版本不同时,请使用源库和目标库都兼容的sql语句执行ddl操作。例如:源库为pg11,目标库为pg12,要将源库表的列类型从char修改为int时,请使用如下语句:
    alter table tablename alter column columnname type int using columnname::int;
  • 执行如下操作步骤前,请检查待同步的源数据库public模式下,是否存在名为hwdrs_ddl_info的表、名为hwdrs_ddl_function()的函数、名为hwdrs_ddl_event的触发器。如存在,请将其删除。
  • 库级同步时,如创建无主键表,请执行如下命令,将无主键表复制属性设置为full。
    alter table tablename replica identity full;

操作步骤

  • 如果源库为其他云上或自建postgresql,执行以下步骤:
    1. 使用拥有创建事件触发器权限的用户连接要同步的数据库。
    2. 执行如下语句,创建存储ddl信息的表。
      drop table if exists public.hwdrs_ddl_info;
      drop sequence if exists public.hwdrs_ddl_info_id_seq;
      create table public.hwdrs_ddl_info(
        id                             bigserial primary key,
        ddl                            text,
        username                       varchar(64) default current_user, 
        txid                           varchar(16) default txid_current()::varchar(16),
        tag                            varchar(64), 
        database                       varchar(64) default current_database(), 
        schema                         varchar(64) default current_schema,
        client_address                 varchar(64) default inet_client_addr(),
        client_port                    integer default inet_client_port(),
        event_time                     timestamp default current_timestamp
      );
    3. 执行如下语句,创建函数。
      create or replace function public.hwdrs_ddl_function()
          returns event_trigger
          language plpgsql
          security invoker
      as $$
          declare ddl text;
          declare real_num int;
          declare max_num int := 50000;
      begin
        if (tg_tag in ('create table','alter table','drop table','create schema','create sequence','alter sequence','drop sequence','create view','alter view','drop view','create index','alter index','drop index','comment')) then
            select current_query() into ddl; 
            insert into public.hwdrs_ddl_info(ddl, username, txid, tag, database, schema, client_address, client_port, event_time)
            values (ddl, current_user, cast(txid_current() as varchar(16)), tg_tag, current_database(), current_schema,  inet_client_addr(), inet_client_port(), current_timestamp);
            select count(id) into real_num from public.hwdrs_ddl_info;
            if real_num > max_num then
              if current_setting('server_version_num')::int<100000 then
                delete from public.hwdrs_ddl_info where id<(select min(id) 1000 from public.hwdrs_ddl_info) and not exists (select 0 from pg_locks l join pg_database d on l.database=d.oid where d.datname=current_catalog and pid<>pg_backend_pid() and locktype='relation' and relation=to_regclass('public.hwdrs_ddl_info_pkey')::oid and mode='rowexclusivelock');
              else 
                delete from public.hwdrs_ddl_info where id<(select min(id) 1000 from public.hwdrs_ddl_info) and (xmax=0 or coalesce(txid_status(xmax::text::bigint), '')<>'in progress');
              end if;
            end if;
        end if;
      end;
      $$;
    4. 执行以下语句,为23中创建的对象赋予必要权限。
      grant usage on schema public to public;
      grant select,insert,delete on table public.hwdrs_ddl_info to public;
      grant select,usage on sequence public.hwdrs_ddl_info_id_seq to public;
      grant execute on function public.hwdrs_ddl_function() to public;
    5. 执行以下语句,创建ddl事件触发器。
      create event trigger hwdrs_ddl_event on ddl_command_end execute procedure public.hwdrs_ddl_function();
    6. 执行以下语句,将创建的事件触发器设置为enable。
      alter event trigger hwdrs_ddl_event enable always;
    7. 返回数据复制服务控制台,创建postgresql->rds for postgresql的同步任务。
    8. 待同步任务结束后,请执行下语句删除创建的表、函数、触发器。
      drop event trigger hwdrs_ddl_event;
      drop function public.hwdrs_ddl_function();
      drop table public.hwdrs_ddl_info;
  • 如果源库为rds for postgresql,执行以下步骤:
    1. 执行如下语句,清理已经创建过的对象。
      drop event trigger if exists hwdrs_ddl_event;
      drop function if exists public.hwdrs_ddl_function();
      drop table if exists public.hwdrs_ddl_info;
    2. 使用root用户执行如下语句,创建ddl插件。
      select control_extension('create', 'rds_hwdrs_ddl');
    3. 执行如下语句,更新函数。
      create or replace function public.hwdrs_ddl_function()
          returns event_trigger
          language plpgsql
          security invoker
      as $body$
          declare ddl text;
          declare real_num int;
          declare max_num int := 50000;
      begin
        if (tg_tag in ('create table','alter table','drop table','create schema','create sequence','alter sequence','drop sequence','create view','alter view','drop view','create index','alter index','drop index','comment')) then
            select current_query() into ddl; 
            insert into public.hwdrs_ddl_info(ddl, username, txid, tag, database, schema, client_address, client_port, event_time)
            values (ddl, current_user, cast(txid_current() as varchar(16)), tg_tag, current_database(), current_schema,  inet_client_addr(), inet_client_port(), current_timestamp);
            select count(id) into real_num from public.hwdrs_ddl_info;
            if real_num > max_num then
              if current_setting('server_version_num')::int<100000 then
                delete from public.hwdrs_ddl_info where id<(select min(id) 1000 from public.hwdrs_ddl_info) and not exists (select 0 from pg_locks l join pg_database d on l.database=d.oid where d.datname=current_catalog and pid<>pg_backend_pid() and locktype='relation' and relation=to_regclass('public.hwdrs_ddl_info_pkey')::oid and mode='rowexclusivelock');
              else 
                delete from public.hwdrs_ddl_info where id<(select min(id) 1000 from public.hwdrs_ddl_info) and (xmax=0 or coalesce(txid_status(xmax::text::bigint), '')<>'in progress');
              end if;
            end if;
        end if;
      end;
      $body$;
    4. 返回数据复制服务控制台,创建postgresql->rds for postgresql的同步任务。
    5. 待同步任务结束后,请执行下语句删除创建的表、函数、触发器。
      select control_extension('drop', 'rds_hwdrs_ddl');

相关文档

网站地图