Skip to content

Commit

Permalink
Added DataPipeline Support tools
Browse files Browse the repository at this point in the history
  • Loading branch information
EC2 Default User committed Aug 28, 2017
1 parent c3f1026 commit acc7ed1
Show file tree
Hide file tree
Showing 8 changed files with 1,132 additions and 0 deletions.
14 changes: 14 additions & 0 deletions DataPipeline/MySqlRdsToPostgreSqlRds/Readme.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
These Scripts are used to copy a Mysql RDS Table to a Postgresql RDS Table using Data Pipeline.


dbconv_mysqlRDS-to-postgresqlRDS.sh is run by a ShellcommandActivity in the pipeline.
mysql_to_redshift.py script translates schema from Mysql to Postgresql.


Please host dbconv_mysqlRDS-to-postgresqlRDS.sh and mysql_to_redshift.py in your S3 bucket and make them Public.


mysqlRDS-psqlRDS-copy-using-shell-definition.json is a sample pipeline definition file that illustrates usage of the above two scripts.


Warning: The scripts are not exhaustively tested for all data types. Pleasse test the scripts on a subset of your original data before using for Production. timestamp column types may require more customization.
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@

#!/bin/bash
#Example Invocation
#./dbconv.sh --rds_jdbc=jdbc:mysql://dbtest.cob91vaba6fq.us-east-1.rds.amazonaws.com:3306/sakila
# --rds_tbl=customer --rds_pwd=testpassword --rds_usr=admin
# --red_jdbc=jdbc:postgresql://eudb3.cvprvckckqrm.eu-west-1.redshift.amazonaws.com:5439/dbtest?tcpKeepAlive=true
# --red_usr=admin --red_pwd=test123E —red_tbl=RedTub
# —red_dist=customer_id —red_sort=create_date --red_ins=OVERWRITE_EXISTING


echo "Number of arguments: $#"
#echo "Arguments: $@"

for i in "$@"
do
case "$i" in
--rds_jdbc=*|-a=*)
RDSJdbc="${i#*=}"
shift
;;
-b=*|--rds_usr=*)
RDSUsr="${i#*=}"
shift
;;
-c=*|--rds_pwd=*)
RDSPwd="${i#*=}"
shift
;;
-d=*|--rds_tbl=*)
RDSTbl="${i#*=}"
shift
;;
-e=*|--red_jdbc=*)
REDJdbc="${i#*=}"
shift
;;
-f=*|--red_usr=*)
REDUsr="${i#*=}"
shift
;;
-g=*|--red_pwd=*)
REDPwd="${i#*=}"
shift
;;
-h=*|--red_tbl=*)
REDTbl="${i#*=}"
shift
;;
-i=*|--red_dist=*)
REDDist="${i#*=}"
shift
;;
-j=*|--red_sort=*)
REDSort="${i#*=}"
shift
;;
-k=*|--red_map=*)
REDMap="${i#*=}"
shift
;;
-l=*|--red_ins=*)
REDIns="${i#*=}"
shift
;;
*)
echo "unknown option"
;;
esac
done

echo "RDS Jdbc: $RDSJdbc"
echo "RDS Usr: $RDSUsr"
#echo "RDS Pwd: $RDSPwd"
echo "RDS Tbl: $RDSTbl"

echo "REDShift Jdbc: $REDJdbc"
echo "RED Usr: $REDUsr"
#echo "RED Pwd: $REDPwd"
echo "(Optional) REDShift Generated Tbl: $REDTbl"
echo "(Optional) REDShift Distribution Key: $REDDist"
echo "(Optional) REDShift Sort Key(s): $REDSort"
echo "(Optional) REDShift Default Translation Override Map: $REDMap"
echo "(Optional) REDShift Data Insert Mode: $REDIns"

# exit script on error
set -e

#1. Install MySQL and Postgresql client including mysqldump. Match the version of postgresql client to that of your RDS Postgresql version.
sudo yum install mysql postgresql93 -y

#3. Parse RDS Jdbc Connect String
RDSHost=`echo $RDSJdbc | awk -F: '{print $3}' | sed 's/\///g'`
echo "RDS Host: $RDSHost"
RDSPort=`echo $RDSJdbc | awk -F: '{print $4}' | awk -F/ '{print $1}'`
echo "RDS Port: $RDSPort"
MySQLDb=`echo $RDSJdbc | awk -F: '{print $4}' | awk -F/ '{print $2}'`
echo "RDS MySQLDB: $MySQLDb"

#4. Parse Redshift Jdbc Connect String
#"jdbc:postgresql://eudb3.cvprvckckqrm.eu-west-1.redshift.amazonaws.com:5439/dbtest?tcpKeepAlive=true"
REDHost=`echo $REDJdbc | awk -F: '{print $3}' | sed 's/\///g'`
echo "REDShift Host: $REDHost"
REDPort=`echo $REDJdbc | awk -F: '{print $4}' | awk -F/ '{print $1}'`
echo "REDShift Port: $REDPort"
REDDb=`echo $REDJdbc | awk -F: '{print $4}' | awk -F/ '{print $2}' | awk -F? '{print $1}'`
echo "REDShift DB: $REDDb"

#5. Dump MySQL Table definition
MySQLFile=rdsmy$(date +%m%d%H%M%S).sql
echo "My SQL dump file: $MySQLFile"
`mysqldump -h $RDSHost --port=$RDSPort -u $RDSUsr --password=$RDSPwd --compatible=postgresql --default-character-set=utf8 -n -d -r $MySQLFile $MySQLDb $RDSTbl`
echo "$MySQLFile created with mysqldump command"

#6. Download the translator python script.
curl -O https://s3.amazonaws.com/datapipeline-us-east-1/sample-scripts/mysql_to_redshift.py

#7. Translate MySQL to Postgresql
RedFile=red$(date +%m%d%H%M%S).psql
echo "created $RedFile file for writing"
echo "calling python script to generate schema file"
python mysql_to_redshift.py --input_file=$MySQLFile --output_file=$RedFile --table_name=$REDTbl --dist_key=$REDDist --sort_keys=$REDSort --map_types=$REDMap --insert_mode=$REDIns
echo "Generated Redshift file: $RedFile"

#8. Import it into Postgresql RDS and create the table
export PGPASSWORD=$REDPwd
psql -h $REDHost -p $REDPort -U $REDUsr -d $REDDb -f $RedFile
echo "postgresql Target table created"

fname=`find /home/ec2-user/ -name '*.csv'|xargs basename`
echo filename=$fname
#9. Copy CSV data from S3 to local EC2 and then copy the row data to target Postgresql RDS table
psql -h $REDHost -p $REDPort -U $REDUsr -d $REDDb -c '\COPY '$REDTbl' FROM '/home/ec2-user/$fname' CSV'
echo "Data copied to Target table"
Original file line number Diff line number Diff line change
@@ -0,0 +1,249 @@
{
"objects": [
{
"input": {
"ref": "S3StagingDataNode"
},
"stage": "false",
"dependsOn": {
"ref": "RDSToS3CopyActivity"
},
"name": "SyncS3CsvToEc2",
"runsOn": {
"ref": "Ec2Instance"
},
"id": "SyncS3CsvToEc2",
"type": "ShellCommandActivity",
"command": "(sudo yum -y update aws-cli) && (aws s3 sync #{input.directoryPath}/ /home/ec2-user/)"
},
{
"directoryPath": "#{myS3StagingLoc}/#{format(@scheduledStartTime, 'YYYY-MM-dd-HH-mm-ss')}",
"name": "S3StagingDataNode",
"id": "S3StagingDataNode",
"type": "S3DataNode"
},
{
"output": {
"ref": "DestJdbcTable"
},
"input": {
"ref": "S3StagingDataNode"
},
"dependsOn": {
"ref": "SyncS3CsvToEc2"
},
"scriptUri": "s3://testbucket/scripts/dbconv_mysqlRDS-to-postgresqlRDS.sh",
"name": "JdbcTableCreateActivity",
"runsOn": {
"ref": "Ec2Instance"
},
"scriptArgument": [
"--rds_jdbc=#{myRDSJdbcConnectStr}",
"--rds_tbl=#{myRDSTableName}",
"--rds_pwd=#{*myRDSPassword}",
"--rds_usr=#{myRDSUsername}",
"--red_jdbc=#{myJdbcConnectStr}",
"--red_usr=#{myJdbcUsername}",
"--red_pwd=#{*myJdbcPassword}",
"--red_tbl=#{myJdbcTableName}",
"--red_dist=#{myJdbcDistributionKey}",
"--red_sort=#{myJdbcSortKeys}",
"--red_map=#{myJdbcTypeConvOverrideMap}"
],
"id": "JdbcTableCreateActivity",
"type": "ShellCommandActivity"
},
{
"database": {
"ref": "JdbcCluster"
},
"name": "DestJdbcTable",
"insertQuery": "#{myJdbcTableInsertSql}",
"id": "DestJdbcTable",
"type": "SqlDataNode",
"table": "#{myJdbcTableName}",
"selectQuery": "select * from #{table}"
},
{
"databaseName": "#{myRDSDatabaseName}",
"*password": "#{*myRDSPassword}",
"name": "rds_mysql",
"jdbcProperties": "allowMultiQueries=true",
"id": "rds_mysql",
"type": "RdsDatabase",
"rdsInstanceId": "#{myRDSinstanceid}",
"username": "#{myRDSUsername}"
},
{
"subnetId": "subnet-7b3a9f46",
"securityGroupIds": [
"sg-f3973b95",
"sg-068b2160"
],
"instanceType": "m3.medium",
"name": "Ec2Instance",
"keyPair": "ec2-VA-keypair",
"id": "Ec2Instance",
"type": "Ec2Resource",
"terminateAfter": "2 Hours"
},
{
"connectionString": "#{myJdbcConnectStr}",
"*password": "#{*myJdbcPassword}",
"name": "JdbcCluster",
"id": "JdbcCluster",
"jdbcProperties": "tcpKeepAlive=true",
"type": "JdbcDatabase",
"jdbcDriverClass": "org.postgresql.Driver",
"username": "#{myJdbcUsername}"
},
{
"output": {
"ref": "S3StagingDataNode"
},
"input": {
"ref": "SrcRDSTable"
},
"name": "RDSToS3CopyActivity",
"id": "RDSToS3CopyActivity",
"runsOn": {
"ref": "Ec2Instance"
},
"type": "CopyActivity"
},
{
"failureAndRerunMode": "CASCADE",
"resourceRole": "DataPipelineDefaultResourceRole",
"role": "DataPipelineDefaultRole",
"pipelineLogUri": "s3://testbucket/logs/",
"scheduleType": "ONDEMAND",
"name": "Default",
"id": "Default"
},
{
"database": {
"ref": "rds_mysql"
},
"name": "SrcRDSTable",
"id": "SrcRDSTable",
"type": "SqlDataNode",
"table": "#{myRDSTableName}",
"selectQuery": "select * from #{table}"
},
{
"input": {
"ref": "S3StagingDataNode"
},
"dependsOn": {
"ref": "JdbcTableCreateActivity"
},
"stage": "false",
"name": "S3StagingCleanupActivity",
"id": "S3StagingCleanupActivity",
"runsOn": {
"ref": "Ec2Instance"
},
"type": "ShellCommandActivity",
"command": "(aws s3 rm #{input.directoryPath} --recursive)"
}
],
"parameters": [
{
"helpText": "The S3 folder to store RDS MySQL table data before loading to Jdbc. The S3 folder must be in the same region as the Jdbc cluster.",
"description": "S3 staging folder",
"id": "myS3StagingLoc",
"type": "AWS::S3::ObjectKey"
},
{
"description": "RDS MySQL password",
"id": "*myRDSPassword",
"type": "String"
},
{
"description": "RDS MySQL table name",
"id": "myRDSTableName",
"type": "String"
},
{
"helpText": "Override the default mapping of RDS MySQL data types to Jdbc data types.",
"watermark": "tinyint(1):smallint,char(35):varchar(70),bigint(20) unsigned:bigint",
"description": "MySQL to Jdbc type conversion overrides",
"optional": "true",
"id": "myJdbcTypeConvOverrideMap",
"type": "String"
},
{
"description": "Jdbc username",
"id": "myJdbcUsername",
"type": "String"
},
{
"watermark": "columnName",
"helpText": "Distribution key column in the Jdbc table. If the distribution key is not specified the primary key is set as a distribution key.",
"description": "Jdbc table distribution key",
"optional": "true",
"id": "myJdbcDistributionKey",
"type": "String"
},
{
"watermark": "jdbc:mysql://dbinstance.id.region.rds.amazonaws.com:3306/dbname",
"description": "RDS MySQL connection string",
"id": "myRDSJdbcConnectStr",
"type": "String"
},
{
"default": "default",
"watermark": "security group name",
"helpText": "The names of one or more security groups that collectively provide the EC2 instance connectivity to both the RDS instance and Jdbc cluster.",
"description": "RDS and Jdbc security group(s)",
"isArray": "true",
"id": "myRDSJdbcSecurityGrps",
"type": "String"
},
{
"helpText": "The name of an existing table or a new table that will be automatically created.",
"description": "Jdbc table name",
"id": "myJdbcTableName",
"type": "String"
},
{
"description": "Jdbc password",
"id": "*myJdbcPassword",
"type": "String"
},
{
"watermark": "columnName1,columnName2",
"helpText": "Sort key columns in the Jdbc table.",
"description": "Jdbc table sort keys",
"optional": "true",
"id": "myJdbcSortKeys",
"type": "String"
},
{
"description": "RDS MySQL username",
"id": "myRDSUsername",
"type": "String"
},
{
"watermark": "jdbc:postgresql://endpoint:port/database?tcpKeepAlive=true",
"description": "Jdbc connection string",
"id": "myJdbcConnectStr",
"type": "String"
}
],
"values": {
"myS3StagingLoc": "s3://testbucket/temp/",
"*myJdbcPassword": "abcd1234",
"myRDSJdbcConnectStr": "jdbc:mysql://myrdstestins.chqgitoif30b.us-east-1.rds.amazonaws.com:3306/myrdstestdb",
"myJdbcConnectStr": "jdbc:postgresql://abcpostgresqldb.chqgitoif30b.us-east-1.rds.amazonaws.com:5432/dev",
"myJdbcTableName": "nameid2",
"myRDSUsername": "abctest",
"myRDSinstanceid": "myrdstestins",
"myRDSJdbcSecurityGrps": "default",
"myJdbcUsername": "abctest",
"myRDSDatabaseName": "myrdstestdb",
"*myRDSPassword": "abcd1234",
"myRDSTableName": "nameid2",
"myJdbcTableInsertSql": "INSERT INTO #{table}(name, id) VALUES (?,?);"
}
}
Loading

0 comments on commit acc7ed1

Please sign in to comment.