Accomodate Haf extension for pg_dump/pg_restore

This commit is contained in:
mkrzeminski 2022-11-18 22:34:39 +01:00
parent 413b9f1851
commit c3b9982d44
20 changed files with 233 additions and 79 deletions

View File

@ -355,18 +355,34 @@ docker exec haf-instance-5M psql -d <your_database> \
-c "drop schema if exists pghero cascade; drop extension if exists pg_stat_statements;"
```
# Using pg_dump/pg_restore to backup/restore a HAF database
When setting up a new HAF server for production or testing purposes, you may want to load data from an existing HAF database using `pg_dump` and `pg_restore` commands, instead of filling it from scratch using hived with a replay of the blockchain data in a block_log.
# Using scripts to perform full backup
Not only the haf Postgres database needs to be stored to have the full data backed up.
Also the hived server state is needed.
To perform the full backup:
1. Stop the hived server
2. execute ```dump_instance.sh```
One problem that arises is that pg_dump doesn't dump tables which are part of a postgres extension, and some tables in HAF are associated with the hive_fork_manager extension. So to use pg_dump to dump a HAF database, you must first temporarily disassociate these tables from the extension, then reassociate them after completing the dump of the database.
Tables can be temporarily disassociated from an extension using the command:
`ALTER EXTENSION <extension name> DROP TABLE <tabel name>;`.
*It is important to note that this command does not drop the tables from the database itself: despite the use of the term "drop" in this command, the tables themselves continue to exist and contain the same data. They are simply no longer associated with the extension, so pg_dump can now dump them to the dump file.*
To perform the full restore and immediately run the hived server:
1. execute ```load_instance.sh```
After the database has been dumped, the tables can be reassociated with an extension using the command:
`ALTER EXTENSION <extension name> ADD TABLE <tabel name>;`
This reassociation needs to be done to the original database that was dumped, and also to any database that is restored from the dumpfile using `pg_rstore`.
All unrecognized options are forwarded to the hived executable.
E.g.:
```scripts/dump_instance.sh --backup-dir=path_to_backup_directory \
--hived-executable-path=path_to_hived \
--hived-data-dir=path_to_hived_datadir \
--haf-db-name=haf_block_log \
--override-existing-backup-dir \
--exit-before-sync
scripts/load_instance.sh --backup-dir=path_to_backup_directory
--hived-executable-path=path_to_hived \
--hived-data-dir=path_to_hived_datadir \
--haf-db-name=haf_block_log \
--exit-before-sync --stop-replay-at-block=5000000
```
Hive_fork_manager has prepared scripts which disassociate and reassociate its tables: [./src/hive_fork_manager/tools/pg_dump](./src/hive_fork_manager/tools/pg_dump/readme.md).

View File

@ -21,6 +21,7 @@ CREATE TABLE IF NOT EXISTS hive.contexts(
CONSTRAINT pk_hive_contexts PRIMARY KEY( id ),
CONSTRAINT uq_hive_context_name UNIQUE ( name )
);
SELECT pg_catalog.pg_extension_config_dump('hive.contexts', '');
CREATE INDEX IF NOT EXISTS hive_contexts_owner_idx ON hive.contexts( owner );
@ -36,6 +37,7 @@ CREATE TABLE IF NOT EXISTS hive.registered_tables(
CONSTRAINT fk_hive_registered_tables_context FOREIGN KEY(context_id) REFERENCES hive.contexts( id ),
CONSTRAINT uq_hive_registered_tables_register_table UNIQUE( origin_table_schema, origin_table_name )
);
SELECT pg_catalog.pg_extension_config_dump('hive.registered_tables', '');
CREATE INDEX IF NOT EXISTS hive_registered_tables_context_idx ON hive.registered_tables( context_id );
@ -51,6 +53,7 @@ CREATE TABLE IF NOT EXISTS hive.triggers(
CONSTRAINT fk_hive_triggers_registered_table FOREIGN KEY( registered_table_id ) REFERENCES hive.registered_tables( id ),
CONSTRAINT uq_hive_triggers_registered_table UNIQUE( trigger_name )
);
SELECT pg_catalog.pg_extension_config_dump('hive.triggers', '');
CREATE INDEX IF NOT EXISTS hive_registered_triggers_table_id ON hive.triggers( registered_table_id );
CREATE INDEX IF NOT EXISTS hive_triggers_owner_idx ON hive.triggers( owner );

View File

@ -11,8 +11,12 @@ CREATE TABLE IF NOT EXISTS hive.verify_table_schema(
table_indexes TEXT NOT NULL
);
SELECT pg_catalog.pg_extension_config_dump('hive.verify_table_schema', '');
CREATE TABLE IF NOT EXISTS hive.table_schema(
schema_name TEXT NOT NULL,
schema_hash UUID NOT NULL
);
SELECT pg_catalog.pg_extension_config_dump('hive.table_schema', '');

View File

@ -8,7 +8,6 @@ CREATE TABLE IF NOT EXISTS hive.events_queue(
, event hive.event_type NOT NULL
, block_num BIGINT NOT NULL
);
INSERT INTO hive.events_queue VALUES( 0, 'NEW_IRREVERSIBLE', 0 ) ON CONFLICT DO NOTHING;
SELECT pg_catalog.pg_extension_config_dump('hive.events_queue', '');
CREATE INDEX IF NOT EXISTS hive_events_queue_block_num_idx ON hive.events_queue( block_num );

View File

@ -4,6 +4,5 @@ CREATE TABLE IF NOT EXISTS hive.fork(
time_of_fork TIMESTAMP WITHOUT TIME ZONE NOT NULL, -- time of receiving notification from hived (see: hive.back_from_fork definition)
CONSTRAINT pk_hive_fork PRIMARY KEY( id )
);
SELECT pg_catalog.pg_extension_config_dump('hive.fork', '');
INSERT INTO hive.fork(block_num, time_of_fork) VALUES( 1, '2016-03-24 16:05:00'::timestamp )
ON CONFLICT DO NOTHING;

View File

@ -8,3 +8,4 @@ CREATE TABLE IF NOT EXISTS hive.indexes_constraints (
is_foreign_key boolean NOT NULL,
CONSTRAINT pk_hive_indexes_constraints UNIQUE( table_name, index_constraint_name )
);
SELECT pg_catalog.pg_extension_config_dump('hive.indexes_constraints', '');

View File

@ -4,4 +4,5 @@ CREATE TABLE IF NOT EXISTS hive.hived_connections(
git_sha TEXT,
time TIMESTAMP WITHOUT TIME ZONE NOT NULL,
CONSTRAINT pk_hived_connections PRIMARY KEY( id )
);
);
SELECT pg_catalog.pg_extension_config_dump('hive.hived_connections', '');

View File

@ -32,6 +32,7 @@ CREATE TABLE IF NOT EXISTS hive.blocks (
CONSTRAINT pk_hive_blocks PRIMARY KEY( num )
);
SELECT pg_catalog.pg_extension_config_dump('hive.blocks', '');
CREATE TABLE IF NOT EXISTS hive.irreversible_data (
id integer,
@ -41,7 +42,7 @@ CREATE TABLE IF NOT EXISTS hive.irreversible_data (
CONSTRAINT fk_1_hive_irreversible_data FOREIGN KEY (consistent_block) REFERENCES hive.blocks (num)
);
INSERT INTO hive.irreversible_data VALUES(1,NULL, FALSE) ON CONFLICT DO NOTHING;
SELECT pg_catalog.pg_extension_config_dump('hive.irreversible_data', '');
CREATE TABLE IF NOT EXISTS hive.transactions (
block_num integer NOT NULL,
@ -54,6 +55,7 @@ CREATE TABLE IF NOT EXISTS hive.transactions (
CONSTRAINT pk_hive_transactions PRIMARY KEY ( trx_hash ),
CONSTRAINT fk_1_hive_transactions FOREIGN KEY (block_num) REFERENCES hive.blocks (num)
);
SELECT pg_catalog.pg_extension_config_dump('hive.transactions', '');
CREATE TABLE IF NOT EXISTS hive.transactions_multisig (
trx_hash bytea NOT NULL,
@ -61,6 +63,7 @@ CREATE TABLE IF NOT EXISTS hive.transactions_multisig (
CONSTRAINT pk_hive_transactions_multisig PRIMARY KEY ( trx_hash, signature ),
CONSTRAINT fk_1_hive_transactions_multisig FOREIGN KEY (trx_hash) REFERENCES hive.transactions (trx_hash)
);
SELECT pg_catalog.pg_extension_config_dump('hive.transactions_multisig', '');
CREATE TABLE IF NOT EXISTS hive.operation_types (
id smallint NOT NULL,
@ -69,6 +72,7 @@ CREATE TABLE IF NOT EXISTS hive.operation_types (
CONSTRAINT pk_hive_operation_types PRIMARY KEY (id),
CONSTRAINT uq_hive_operation_types UNIQUE (name)
);
SELECT pg_catalog.pg_extension_config_dump('hive.operation_types', '');
CREATE TABLE IF NOT EXISTS hive.operations (
id bigint not null,
@ -89,6 +93,7 @@ CREATE TABLE IF NOT EXISTS hive.operations (
CONSTRAINT fk_1_hive_operations FOREIGN KEY (block_num) REFERENCES hive.blocks(num),
CONSTRAINT fk_2_hive_operations FOREIGN KEY (op_type_id) REFERENCES hive.operation_types (id)
);
SELECT pg_catalog.pg_extension_config_dump('hive.operations', '');
CREATE TABLE IF NOT EXISTS hive.applied_hardforks (
hardfork_num smallint NOT NULL,
@ -109,6 +114,7 @@ CREATE TABLE IF NOT EXISTS hive.accounts (
, CONSTRAINT uq_hive_accounst_name UNIQUE ( name )
, CONSTRAINT fk_1_hive_accounts FOREIGN KEY (block_num) REFERENCES hive.blocks (num)
);
SELECT pg_catalog.pg_extension_config_dump('hive.accounts', '');
CREATE TABLE IF NOT EXISTS hive.account_operations
(
@ -123,6 +129,7 @@ CREATE TABLE IF NOT EXISTS hive.account_operations
, CONSTRAINT hive_account_operations_uq_1 UNIQUE( account_id, account_op_seq_no )
, CONSTRAINT hive_account_operations_uq2 UNIQUE ( account_id, operation_id )
);
SELECT pg_catalog.pg_extension_config_dump('hive.account_operations', '');
CREATE INDEX IF NOT EXISTS hive_applied_hardforks_block_num_idx ON hive.applied_hardforks ( block_num );

View File

@ -4,6 +4,7 @@ CREATE TABLE IF NOT EXISTS hive.blocks_reversible(
EXCLUDING INDEXES
EXCLUDING IDENTITY
);
SELECT pg_catalog.pg_extension_config_dump('hive.blocks_reversible', '');
ALTER TABLE hive.blocks_reversible
ADD COLUMN IF NOT EXISTS fork_id BIGINT NOT NULL,
ADD CONSTRAINT pk_hive_blocks_reversible PRIMARY KEY( num, fork_id ),
@ -17,6 +18,7 @@ CREATE TABLE IF NOT EXISTS hive.transactions_reversible(
EXCLUDING INDEXES
EXCLUDING IDENTITY
);
SELECT pg_catalog.pg_extension_config_dump('hive.transactions_reversible', '');
ALTER TABLE hive.transactions_reversible
ADD COLUMN IF NOT EXISTS fork_id BIGINT NOT NULL,
ADD CONSTRAINT fk_1_hive_transactions_reversible FOREIGN KEY (block_num, fork_id) REFERENCES hive.blocks_reversible(num,fork_id),
@ -30,6 +32,7 @@ CREATE TABLE IF NOT EXISTS hive.transactions_multisig_reversible(
EXCLUDING INDEXES
EXCLUDING IDENTITY
);
SELECT pg_catalog.pg_extension_config_dump('transactions_multisig_reversible', '');
ALTER TABLE hive.transactions_multisig_reversible
ADD COLUMN IF NOT EXISTS fork_id BIGINT NOT NULL,
ADD CONSTRAINT pk_transactions_multisig_reversible PRIMARY KEY ( trx_hash, signature, fork_id ),
@ -43,6 +46,7 @@ CREATE TABLE IF NOT EXISTS hive.operations_reversible(
EXCLUDING INDEXES
EXCLUDING IDENTITY
);
SELECT pg_catalog.pg_extension_config_dump('operations_reversible', '');
ALTER TABLE hive.operations_reversible
ADD COLUMN IF NOT EXISTS fork_id BIGINT NOT NULL,
ADD CONSTRAINT pk_operations_reversible PRIMARY KEY( id, block_num, fork_id ),
@ -59,6 +63,7 @@ CREATE TABLE IF NOT EXISTS hive.accounts_reversible(
EXCLUDING INDEXES
EXCLUDING IDENTITY
);
SELECT pg_catalog.pg_extension_config_dump('accounts_reversible', '');
ALTER TABLE hive.accounts_reversible
ADD COLUMN IF NOT EXISTS fork_id BIGINT NOT NULL,
ADD CONSTRAINT pk_hive_accounts_reversible_id PRIMARY KEY( id, fork_id ),
@ -75,6 +80,7 @@ CREATE TABLE IF NOT EXISTS hive.account_operations_reversible(
EXCLUDING IDENTITY
)
;
SELECT pg_catalog.pg_extension_config_dump('account_operations_reversible', '');
ALTER TABLE hive.account_operations_reversible
ADD COLUMN IF NOT EXISTS fork_id BIGINT NOT NULL,
ADD CONSTRAINT fk_1_hive_account_operations_reversible FOREIGN KEY ( operation_id, fork_id ) REFERENCES hive.operations_reversible( id, fork_id ),

View File

@ -9,5 +9,6 @@ CREATE TABLE IF NOT EXISTS hive.state_providers_registered(
, CONSTRAINT uq_hive_state_providers_registered_contexts_provider UNIQUE ( context_id, state_provider )
, CONSTRAINT fk_hive_state_providers_registered_context FOREIGN KEY( context_id ) REFERENCES hive.contexts( id )
);
SELECT pg_catalog.pg_extension_config_dump('hive.state_providers_registered', '');
CREATE INDEX IF NOT EXISTS hive_state_providers_registered_idx ON hive.state_providers_registered( owner );

View File

@ -1,23 +0,0 @@
alter extension hive_fork_manager add table hive.contexts;
alter extension hive_fork_manager add table hive.registered_tables;
alter extension hive_fork_manager add table hive.triggers;
alter extension hive_fork_manager add table hive.events_queue;
alter extension hive_fork_manager add table hive.fork;
alter extension hive_fork_manager add table hive.indexes_constraints;
alter extension hive_fork_manager add table hive.blocks;
alter extension hive_fork_manager add table hive.irreversible_data;
alter extension hive_fork_manager add table hive.transactions;
alter extension hive_fork_manager add table hive.transactions_multisig;
alter extension hive_fork_manager add table hive.operation_types;
alter extension hive_fork_manager add table hive.operations;
alter extension hive_fork_manager add table hive.blocks_reversible;
alter extension hive_fork_manager add table hive.transactions_reversible;
alter extension hive_fork_manager add table hive.transactions_multisig_reversible;
alter extension hive_fork_manager add table hive.operations_reversible;
alter extension hive_fork_manager add table hive.accounts_reversible;
alter extension hive_fork_manager add table hive.account_operations_reversible;
alter extension hive_fork_manager add table hive.accounts;
alter extension hive_fork_manager add table hive.account_operations;
alter extension hive_fork_manager add table hive.applied_hardforks_reversible;
alter extension hive_fork_manager add table hive.applied_hardforks;

View File

@ -1,24 +0,0 @@
alter extension hive_fork_manager drop table hive.contexts;
alter extension hive_fork_manager drop table hive.registered_tables;
alter extension hive_fork_manager drop table hive.triggers;
alter extension hive_fork_manager drop table hive.events_queue;
alter extension hive_fork_manager drop table hive.fork;
alter extension hive_fork_manager drop table hive.indexes_constraints;
alter extension hive_fork_manager drop table hive.blocks;
alter extension hive_fork_manager drop table hive.irreversible_data;
alter extension hive_fork_manager drop table hive.transactions;
alter extension hive_fork_manager drop table hive.transactions_multisig;
alter extension hive_fork_manager drop table hive.operation_types;
alter extension hive_fork_manager drop table hive.operations;
alter extension hive_fork_manager drop table hive.blocks_reversible;
alter extension hive_fork_manager drop table hive.transactions_reversible;
alter extension hive_fork_manager drop table hive.transactions_multisig_reversible;
alter extension hive_fork_manager drop table hive.operations_reversible;
alter extension hive_fork_manager drop table hive.accounts_reversible;
alter extension hive_fork_manager drop table hive.account_operations_reversible;
alter extension hive_fork_manager drop table hive.accounts;
alter extension hive_fork_manager drop table hive.account_operations;
alter extension hive_fork_manager drop table hive.applied_hardforks_reversible;
alter extension hive_fork_manager drop table hive.applied_hardforks;

View File

@ -1,11 +0,0 @@
# Scripts which allow to use pg_dump and pg_restore with hive_fork manager
Tables which are parts of a postgres extension are not dumped by pg_dump, to solve this problem
they must be removed from the extension, so the procedure to dump the dabase looks as below:
1. execute [drop_from_extension.sql](./drop_from_extension.sql) on the db, to drop dables from the extension
2. issue pg_dump on the db
3. execute [add_to_extension.sql](./add_to_extension.sql) on the db, to restore its valid state
Restore procedure:
1. issue pg_restore with --disable_triggers
2. execute [add_to_extension.sql](./add_to_extension.sql) on the db, to restore its valid state

View File

@ -74,6 +74,12 @@ bool is_database_correct( const std::string& database_url, bool force_open_incon
database_url
, "Check consistency of irreversible data"
, [&is_irreversible_dirty](const data_processor::data_chunk_ptr&, transaction_controllers::transaction& tx) -> data_processor::data_processing_status {
// these tables need to be empty in haf extension script because of pg_dump/pg/restore
tx.exec("INSERT INTO hive.irreversible_data VALUES(1,NULL, FALSE) ON CONFLICT DO NOTHING;");
tx.exec("INSERT INTO hive.events_queue VALUES( 0, 'NEW_IRREVERSIBLE', 0 ) ON CONFLICT DO NOTHING;");
tx.exec("INSERT INTO hive.fork(block_num, time_of_fork) VALUES( 1, '2016-03-24 16:05:00'::timestamp ) ON CONFLICT DO NOTHING;");
pqxx::result data = tx.exec("select hive.is_irreversible_dirty() as _result;");
FC_ASSERT( !data.empty(), "No response from database" );
FC_ASSERT( data.size() == 1, "Wrong data size" );

View File

@ -9,6 +9,8 @@ postgres_port=$4;
setup_test_database "$setup_scripts_dir_path" "$postgres_port" "$test_path"
psql -p $postgres_port -d $DB_NAME -a -v ON_ERROR_STOP=on -f ./test_tools.sql;
# add test functions:
# load tests function
psql -p $postgres_port -d $DB_NAME -a -v ON_ERROR_STOP=on -f ${test_path};

View File

@ -9,6 +9,8 @@ postgres_port=$4;
setup_test_database "$setup_scripts_dir_path" "$postgres_port" "$test_path"
psql -p $postgres_port -d $DB_NAME -a -v ON_ERROR_STOP=on -f ./test_tools.sql;
psql postgresql://test_hived:test@localhost:$postgres_port/$DB_NAME --username=test_hived -a -v ON_ERROR_STOP=on -f ./examples/prepare_data.sql
evaluate_result $?;

View File

@ -1,3 +1,8 @@
INSERT INTO hive.irreversible_data VALUES(1,NULL, FALSE) ON CONFLICT DO NOTHING;
INSERT INTO hive.events_queue VALUES( 0, 'NEW_IRREVERSIBLE', 0 ) ON CONFLICT DO NOTHING;
INSERT INTO hive.fork(block_num, time_of_fork) VALUES( 1, '2016-03-24 16:05:00'::timestamp ) ON CONFLICT DO NOTHING;
CREATE OR REPLACE FUNCTION hive.unordered_arrays_equal(arr1 TEXT[], arr2 TEXT[])
RETURNS bool
LANGUAGE plpgsql

View File

@ -1,17 +1,26 @@
from __future__ import annotations
import time
from pathlib import Path
from typing import Dict
from typing import Any, TYPE_CHECKING, Dict, Optional
import test_tools as tt
from haf_local_tools import block_logs
from haf_local_tools.tables import EventsQueue
from shared_tools.complex_networks import run_networks
if TYPE_CHECKING:
from sqlalchemy.engine.row import Row
from sqlalchemy.orm.session import Session
BLOCKS_IN_FORK = 5
BLOCKS_AFTER_FORK = 5
WAIT_FOR_CONTEXT_TIMEOUT = 90.0
def make_fork(networks: Dict[str, tt.Network], main_chain_trxs=[], fork_chain_trxs=[]):
alpha_net = networks['Alpha']
beta_net = networks['Beta']
@ -64,6 +73,10 @@ def get_irreversible_block(node):
return irreversible_block_num
def get_blocklog_directory():
return Path(__file__).parent.resolve()
def prepare_networks(networks: Dict[str, tt.Network], replay_all_nodes = True):
blocklog_directory = None
if replay_all_nodes:
@ -71,10 +84,11 @@ def prepare_networks(networks: Dict[str, tt.Network], replay_all_nodes = True):
run_networks(list(networks.values()), blocklog_directory)
def create_node_with_database(network: tt.Network, url):
def create_node_with_database(url: str, network: Optional[tt.Network] = None) -> tt.ApiNode:
api_node = tt.ApiNode(network=network)
api_node.config.plugin.append('sql_serializer')
api_node.config.psql_url = str(url)
api_node.config.psql_url = url
return api_node
@ -154,3 +168,13 @@ def wait_until_irreversible(node_under_test, session):
if result[ len(result) - 1 ].event == 'NEW_IRREVERSIBLE':
return
def query_col(session: Session, sql: str, **kwargs) -> list[Any]:
"""Perform a `SELECT n*1`"""
return [row[0] for row in session.execute(sql, params=kwargs).fetchall()]
def query_all(session: Session, sql: str, **kwargs) -> list[Row]:
"""Perform a `SELECT n*m`"""
return session.execute(sql, params=kwargs).fetchall()

View File

@ -16,8 +16,6 @@ def test_compare_forked_node_database(prepared_networks_and_database, database):
session_ref = database('postgresql:///haf_block_log_ref')
reference_node = create_node_with_database(networks['Alpha'], session_ref.get_bind().url)
# WHEN
prepare_networks(networks)
node_under_test.wait_for_block_with_number(START_TEST_BLOCK)

View File

@ -0,0 +1,138 @@
from __future__ import annotations
import subprocess
from pathlib import Path
from typing import TYPE_CHECKING, Callable, Final
import pytest
import test_tools as tt
from local_tools import create_node_with_database, get_blocklog_directory, query_all, query_col
if TYPE_CHECKING:
from sqlalchemy.engine.row import Row
from sqlalchemy.engine.url import URL
from sqlalchemy.orm.session import Session
DUMP_FILENAME: Final[str] = "adump.Fcsql"
SQL_ALL_TABLES_AND_VIEWS: Final[str] = """
SELECT table_name
FROM information_schema.tables
WHERE table_schema = 'hive' ORDER BY table_name;
"""
SQL_TABLE_COLUMNS: Final[str] = """
SELECT column_name
FROM information_schema.columns
WHERE table_schema = 'hive' AND table_name = :table;
"""
SQL_TABLE_CONTENT: Final[str] = """
SELECT *
FROM hive.{table}
ORDER BY {columns};
"""
def pg_restore_from_toc(target_db_name: str, tmp_path: Path) -> None:
"""
For debugging purposes it is sometimes valuable to display dump contents like this:
pg_restore --section=pre-data --disable-triggers -Fc -f adump-pre-data.sql adump.Fcsql
"""
dump_file_path = tmp_path / DUMP_FILENAME
original_toc = tmp_path / f'{target_db_name}_org.toc'
stripped_toc = tmp_path / f'{target_db_name}_stripped.toc'
shell(f"pg_restore --exit-on-error -l {dump_file_path} > {original_toc}")
shell(
fr"grep -v '[0-9]\+; [0-9]\+ [0-9]\+ SCHEMA - hive' {original_toc}"
fr"| grep -v '[0-9]\+; [0-9]\+ [0-9]\+ POLICY hive' > {stripped_toc}"
)
shell(
f"pg_restore --exit-on-error --single-transaction -L {stripped_toc} -d {target_db_name} {dump_file_path}"
)
def pg_restore_from_dump_file_only(target_db_name: str, tmp_path: Path) -> None:
dump_file_path = tmp_path / DUMP_FILENAME
shell(f"pg_restore --section=pre-data --disable-triggers -d {target_db_name} {dump_file_path}")
shell(f"pg_restore -j 3 --section=data --disable-triggers -d {target_db_name} {dump_file_path}")
shell(f"pg_restore --section=post-data --disable-triggers --clean --if-exists -d {target_db_name} {dump_file_path}")
@pytest.mark.parametrize("pg_restore", [pg_restore_from_toc, pg_restore_from_dump_file_only])
def test_pg_dump(database, pg_restore: Callable[[str, Path], None], tmp_path: Path):
# GIVEN
source_session, source_db_url = prepare_source_db(database)
target_session, target_db_url = prepare_target_db(database)
# WHEN
pg_dump(source_db_url.database, tmp_path)
pg_restore(target_db_url.database, tmp_path)
# THEN
compare_databases(source_session, target_session)
compare_psql_tool_dumped_schemas(source_db_url.database, target_db_url.database, tmp_path)
def prepare_source_db(database) -> tuple[Session, URL]:
session, _ = database('postgresql:///test_pg_dump_source')
db_name = session.bind.url
node = create_node_with_database(url=str(db_name))
block_log = tt.BlockLog(get_blocklog_directory() / 'block_log')
node.run(replay_from=block_log, stop_at_block=105, exit_before_synchronization=True)
return session, db_name
def prepare_target_db(database) -> tuple[Session, URL]:
session, _ = database('postgresql:///test_pg_dump_target')
db_name = session.bind.url
return session, db_name
def pg_dump(db_name: str, tmp_path: Path) -> None:
shell(f'pg_dump -j 3 -Fd -d {db_name} -f {tmp_path / DUMP_FILENAME}')
def compare_databases(source_session: Session, target_session: Session) -> None:
source_table_names = query_col(source_session, SQL_ALL_TABLES_AND_VIEWS)
target_table_names = query_col(target_session, SQL_ALL_TABLES_AND_VIEWS)
assert source_table_names == target_table_names
for table in source_table_names:
source_recordset = take_table_contents(source_session, table)
target_recordset = take_table_contents(target_session, table)
assert source_recordset == target_recordset, f"ERROR: in table_or_view: {table}"
def take_table_contents(session: Session, table: str) -> list[Row]:
column_names = query_col(session, SQL_TABLE_COLUMNS, table=table)
columns = ', '.join(column_names)
sql_raw = SQL_TABLE_CONTENT.format(table=table, columns=columns)
return query_all(session, sql_raw)
def compare_psql_tool_dumped_schemas(source_db_name: str, target_db_name: str, tmp_path: Path) -> None:
source_schema = create_psql_tool_dumped_schema(source_db_name, tmp_path)
target_schema = create_psql_tool_dumped_schema(target_db_name, tmp_path)
assert source_schema == target_schema
def create_psql_tool_dumped_schema(db_name: str, tmp_path: Path) -> str:
schema_filename = tmp_path / (db_name + '_schema.txt')
shell(rf"psql -d {db_name} -c '\dn' > {schema_filename}")
shell(rf"psql -d {db_name} -c '\d hive.*' >> {schema_filename}")
with open(schema_filename, encoding="utf-8") as file:
return file.read()
def shell(command: str) -> None:
subprocess.call(command, shell=True)