Skip to content

Commit

Permalink
Fix issues with datatypes for Logical Replication (babelfish-for-post…
Browse files Browse the repository at this point in the history
…gresql#2268)

This commit fixes following issues:

1. (Var)binary columns are not replicated correctly since input function considers
the value as a raw ASCII string although the value is actually already in binary format.
Fixed this by using IsLogicalWorker() with existing TSQLHexConstTypmod
check in varbinaryin function to correctly take binary input sent by the publisher.
2. SQL_VARIANT columns are replicated correctly as far as the data contents are
concerned, but the meta data is lost in the process which is because sql_variant output
function returns the value in the respective datatype in string format which when fed
back to the sql_variant input function gets treated as a varchar value.
Fixed this by replicating sql_variant values as bytea. We will use MyReplicationSlot
and MyWalSnd in sqlvariantout function on publisher and IsLogicalWorker() in
sqlvariantin function on subscriber to identify if we are in logical replication context.
3. On the subscriber side, IsLogicalWorker() is sufficient for native PG applyworker
but will not work with external providers like pglogical, so will rely on SessionReplicationRole
being replica since most of the providers seem to set this GUC.

Additionally, this adds support to run logical replication tests in github action along with
JDBC framework.

Task: BABEL-3427, BABEL-4049
Signed-off-by: Rishabh Tanwar [email protected]
  • Loading branch information
rishabhtanwar29 authored Jan 29, 2024
1 parent 07993e5 commit 214ea58
Show file tree
Hide file tree
Showing 12 changed files with 931 additions and 38 deletions.
34 changes: 28 additions & 6 deletions .github/composite-actions/install-extensions/action.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,18 @@ inputs:
description: 'Postgres Parallel Query Mode'
required: no
default: false
psql_port:
description: "Postgres port"
required: false
default: 5432
tsql_port:
description: "TDS port"
required: false
default: 1433
wal_level:
description: "wal_level GUC setting"
required: false
default: replica

runs:
using: "composite"
Expand All @@ -22,11 +34,20 @@ runs:
ulimit -c unlimited
cd ~
export PATH=/opt/mssql-tools/bin:$PATH
~/${{inputs.install_dir}}/bin/initdb -D ~/${{inputs.install_dir}}/data/
~/${{inputs.install_dir}}/bin/pg_ctl -c -D ~/${{inputs.install_dir}}/data/ -l logfile start
cd ${{inputs.install_dir}}/data
# add port as suffix in data directory name to make it unique
if [[ ${{inputs.psql_port}} != 5432 ]];then
export DATADIR=data_${{inputs.psql_port}}
else
export DATADIR=data
fi
~/${{inputs.install_dir}}/bin/initdb -D ~/${{inputs.install_dir}}/$DATADIR/
cd ${{inputs.install_dir}}/$DATADIR
sudo sed -i "s/#listen_addresses = 'localhost'/listen_addresses = '*'/g" postgresql.conf
sudo sed -i "s/#shared_preload_libraries = ''/shared_preload_libraries = 'babelfishpg_tds, pg_stat_statements'/g" postgresql.conf
sudo sed -i "s/#port = 5432/port = ${{inputs.psql_port}}/g" postgresql.conf
sudo sed -i "s/#wal_level = replica/wal_level = ${{inputs.wal_level}}/g" postgresql.conf
ipaddress=$(ifconfig eth0 | grep 'inet ' | cut -d: -f2 | awk '{ print $2}')
# Allow only runner to have trust authentication, all other users must provide a password
{
Expand All @@ -37,8 +58,9 @@ runs:
sudo echo "host all all 0.0.0.0/0 md5"
sudo echo "host all all ::/0 md5"
} > pg_hba.conf
~/${{inputs.install_dir}}/bin/pg_ctl -c -D ~/${{inputs.install_dir}}/data/ -l logfile restart
~/${{inputs.install_dir}}/bin/pg_ctl -c -D ~/${{inputs.install_dir}}/$DATADIR/ -l logfile start
cd ~/work/babelfish_extensions/babelfish_extensions/
sudo ~/${{inputs.install_dir}}/bin/psql -d postgres -U runner -v user="jdbc_user" -v db="jdbc_testdb" -v migration_mode=${{inputs.migration_mode}} -v parallel_query_mode=${{inputs.parallel_query_mode}} -f .github/scripts/create_extension.sql
sqlcmd -S localhost -U "jdbc_user" -P 12345678 -Q "SELECT @@version GO"
sudo ~/${{inputs.install_dir}}/bin/psql -d postgres -U runner -p ${{inputs.psql_port}} -v user="jdbc_user" -v db="jdbc_testdb" -v migration_mode=${{inputs.migration_mode}} -v tsql_port=${{inputs.tsql_port}} -v parallel_query_mode=${{inputs.parallel_query_mode}} -f .github/scripts/create_extension.sql
~/${{inputs.install_dir}}/bin/pg_ctl -c -D ~/${{inputs.install_dir}}/$DATADIR/ -l logfile restart
sqlcmd -S localhost,${{inputs.tsql_port}} -U "jdbc_user" -P 12345678 -Q "SELECT @@version GO"
shell: bash
7 changes: 7 additions & 0 deletions .github/composite-actions/run-jdbc-tests/action.yml
Original file line number Diff line number Diff line change
@@ -1,13 +1,20 @@
name: 'Run JDBC Tests'
description: 'Run Babel JDBC test framework'

inputs:
input_dir:
description: 'Test input directory'
required: no
default: input

runs:
using: "composite"
steps:
- name: Run JDBC Tests
run: |
export PATH=~/${{env.INSTALL_DIR}}/bin:$PATH
export PG_SRC=~/work/babelfish_extensions/postgresql_modified_for_babelfish
export inputFilesPath=${{inputs.input_dir}}
cd test/JDBC/
mvn test
shell: bash
1 change: 1 addition & 0 deletions .github/scripts/create_extension.sql
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ GRANT ALL ON SCHEMA sys to :user;
ALTER USER :user CREATEDB;
ALTER SYSTEM SET babelfishpg_tsql.database_name = :db;
ALTER SYSTEM SET babelfishpg_tsql.migration_mode = :'migration_mode';
ALTER SYSTEM SET babelfishpg_tds.port = :tsql_port;

\if :parallel_query_mode
ALTER SYSTEM SET parallel_setup_cost = 0;
Expand Down
72 changes: 55 additions & 17 deletions .github/workflows/jdbc-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -52,58 +52,98 @@ jobs:
id: install-extensions
if: always() && steps.build-postgis-extension.outcome == 'success'
uses: ./.github/composite-actions/install-extensions
with:
wal_level: logical

- name: Run JDBC Tests
id: jdbc
if: always() && steps.install-extensions.outcome == 'success'
timeout-minutes: 60
uses: ./.github/composite-actions/run-jdbc-tests

- name: Start secondary server
id: start-secondary
if: always() && steps.jdbc.outcome == 'success'
uses: ./.github/composite-actions/install-extensions
with:
psql_port: 5433
tsql_port: 8199
wal_level: logical

- name: Setup Publication and Subscription
id: setup-pub-sub
if: always() && steps.start-secondary.outcome == 'success'
run: |
~/${{env.INSTALL_DIR}}/bin/psql -d jdbc_testdb -U runner -c "CREATE PUBLICATION my_pub;"
~/${{env.INSTALL_DIR}}/bin/psql -d jdbc_testdb -U runner -p 5433 -c "CREATE SUBSCRIPTION my_sub CONNECTION 'host=localhost port=5432 user=jdbc_user dbname=jdbc_testdb password=12345678' PUBLICATION my_pub;"
- name: Run Replication Tests
id: replication
if: always() && steps.setup-pub-sub.outcome == 'success'
timeout-minutes: 60
uses: ./.github/composite-actions/run-jdbc-tests
with:
input_dir: 'replication'

- name: Cleanup babelfish database
id: cleanup
if: always() && steps.install-extensions.outcome == 'success'
if: always() && steps.replication.outcome == 'success'
run: |
sudo ~/psql/bin/psql -d postgres -U runner -v user="jdbc_user" -v db="jdbc_testdb" -f .github/scripts/cleanup_babelfish_database.sql
- name: Upload Log
if: always() && steps.jdbc.outcome == 'failure'
if: always() && (steps.jdbc.outcome == 'failure' || steps.replication.outcome == 'failure')
uses: actions/upload-artifact@v2
with:
name: postgres-log-jdbc
path: ~/psql/data/logfile

path: |
~/psql/data/logfile
~/psql/data_5433/logfile
# The test summary files contain paths with ':' characters, which is not allowed with the upload-artifact actions
- name: Rename Test Summary Files
id: test-file-rename
if: always() && steps.jdbc.outcome == 'failure'
if: always() && (steps.jdbc.outcome == 'failure' || steps.replication.outcome == 'failure')
run: |
cd test/JDBC/Info
timestamp=`ls -Art | tail -n 1`
cd $timestamp
mv $timestamp.diff ../output-diff.diff
mv "$timestamp"_runSummary.log ../run-summary.log
cd ..
# get the replication output diff as well if it is present
dir_count=`ls | wc -l`
if [[ $dir_count -eq 2 ]];then
timestamp=`ls -rt | tail -n 2 | sort -r | tail -n 1`
cd $timestamp
mv $timestamp.diff ../replication-output-diff.diff
mv "$timestamp"_runSummary.log ../replication-run-summary.log
- name: Upload Run Summary
if: always() && steps.test-file-rename == 'success'
if: always() && steps.test-file-rename.outcome == 'success'
uses: actions/upload-artifact@v2
with:
name: run-summary.log
path: test/JDBC/Info/run-summary.log

path: |
test/JDBC/Info/run-summary.log
test/JDBC/Info/replication-run-summary.log
- name: Upload Output Diff
if: always() && steps.jdbc.outcome == 'failure'
if: always() && (steps.jdbc.outcome == 'failure' || steps.replication.outcome == 'failure')
uses: actions/upload-artifact@v2
with:
name: jdbc-output-diff.diff
path: test/JDBC/Info/output-diff.diff
path: |
test/JDBC/Info/output-diff.diff
test/JDBC/Info/replication-output-diff.diff
- name: Check and upload coredumps
if: always() && steps.jdbc.outcome == 'failure'
if: always() && (steps.jdbc.outcome == 'failure' || steps.replication.outcome == 'failure')
uses: ./.github/composite-actions/upload-coredump

- name: Generate Code Coverage
id: generate-code-coverage
if: always() && steps.jdbc.outcome == 'success'
if: always() && (steps.jdbc.outcome == 'success' && steps.replication.outcome == 'success')
run: |
export PG_CONFIG=~/psql/bin/pg_config
export PG_SRC=~/work/postgresql_modified_for_babelfish
Expand All @@ -116,7 +156,7 @@ jobs:
cd ..
done
shell: bash

- name: Summarize code coverage
id: code-coverage-summary
if: always() && steps.generate-code-coverage.outcome == 'success'
Expand All @@ -132,5 +172,3 @@ jobs:
name: coverage-babelfish-extensions-jdbc
path: contrib/jdbc-lcov.info
retention-days: 1


23 changes: 23 additions & 0 deletions contrib/babelfishpg_common/src/logical.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
#ifndef LOGICAL_H
#define LOGICAL_H

#include "postgres.h"

/*
* IsLogicalWorker() is sufficient for native PG applyworker but will
* not work with external providers like pglogical, so will rely on
* SessionReplicationRole being replica since most of the providers seem
* to set this GUC.
*/
#define IS_LOGICAL_RECEIVER() (IsLogicalWorker() || SessionReplicationRole == SESSION_REPLICATION_ROLE_REPLICA)

/*
* There are two criterias for walsender:
* 1. MyReplicationSlot is logical.
* 2. This is a logical walsender process.
*/
#define IS_LOGICAL_SENDER() \
((MyReplicationSlot != NULL && SlotIsLogical(MyReplicationSlot)) || \
(MyWalSnd != NULL && MyWalSnd->kind == REPLICATION_KIND_LOGICAL))

#endif /* LOGICAL_H */
13 changes: 13 additions & 0 deletions contrib/babelfishpg_common/src/sqlvariant.c
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include "catalog/pg_type.h"
#include "catalog/pg_operator.h"
#include "commands/dbcommands.h"
#include "commands/trigger.h"
#include "lib/stringinfo.h"
#include "libpq/pqformat.h"
#include "port.h"
Expand All @@ -29,6 +30,9 @@
#include "parser/parse_coerce.h"
#include "parser/parse_oper.h"
#include "instr.h"
#include "replication/logicalworker.h"
#include "replication/walsender_private.h"
#include "replication/slot.h"
#include "utils/builtins.h"
#include "utils/elog.h"
#include "utils/guc.h"
Expand All @@ -43,6 +47,7 @@

#include "collation.h"
#include "datetimeoffset.h"
#include "logical.h"
#include "typecode.h"
#include "numeric.h"
#include "sqlvariant.h"
Expand Down Expand Up @@ -79,6 +84,10 @@ sqlvariantin(PG_FUNCTION_ARGS)
Oid typIOParam;
svhdr_5B_t *svhdr;

/* Input as a bytea instead if it is logical replication applyworker. */
if (IS_LOGICAL_RECEIVER())
PG_RETURN_DATUM(byteain(fcinfo));

getTypeInputInfo(type, &input_func, &typIOParam);
/* evalute input fuction */
data_val = (text *) OidInputFunctionCall(input_func, str, typelem, atttypmod);
Expand Down Expand Up @@ -130,6 +139,10 @@ sqlvariantout(PG_FUNCTION_ARGS)
size_t data_len = VARSIZE_ANY_EXHDR(vlena) - svhdr_size;
Datum *output_datum = palloc0(SIZEOF_DATUM);

/* Output as a bytea instead if we are in a logical decoding context. */
if (IS_LOGICAL_SENDER())
PG_RETURN_DATUM(byteaout(fcinfo));

if (!get_typbyval(type)) /* pass by reference */
*output_datum = SV_DATUM(vlena, svhdr_size);
else /* pass by value */
Expand Down
14 changes: 11 additions & 3 deletions contrib/babelfishpg_common/src/varbinary.c
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include "catalog/pg_collation.h"
#include "catalog/pg_type.h"
#include "collation.h"
#include "commands/trigger.h"
#include "common/int.h"
#include "encoding/encoding.h"
#include "lib/hyperloglog.h"
Expand All @@ -27,6 +28,7 @@
#include "parser/scansup.h"
#include "port/pg_bswap.h"
#include "regex/regex.h"
#include "replication/logicalworker.h"
#include "utils/array.h"
#include "utils/builtins.h"
#include "utils/bytea.h"
Expand All @@ -38,6 +40,7 @@
#include "utils/varlena.h"

#include "instr.h"
#include "logical.h"

PG_FUNCTION_INFO_V1(varbinaryin);
PG_FUNCTION_INFO_V1(varbinaryout);
Expand Down Expand Up @@ -177,10 +180,15 @@ varbinaryin(PG_FUNCTION_ARGS)

len = strlen(inputText);

/*
* Assume that input string is already hex encoded for following cases:
* 1. Typmode is TSQLHexConstTypmod
* 2. dump_restore GUC is set.
* 3. This is logical replication applyworker.
*/
if (typmod == TSQLHexConstTypmod ||
(dump_restore && strcmp(dump_restore, "on") == 0)) /* Treat input string as
* T-SQL hex constant
* during restore */
(dump_restore && strcmp(dump_restore, "on") == 0) ||
IS_LOGICAL_RECEIVER())
{
/*
* calculate length of the binary code e.g. 0xFF should be 1 byte
Expand Down
Loading

0 comments on commit 214ea58

Please sign in to comment.