Performing incremental load between AWS S3 and Redshift table.
Step 1: Create a stageing table in AWS Redshift database similar to your target table. Execute below command to load data from S3 to Staging table.
QUERY 1
copy stage.logtracking from 's3://Bucket/FolderName'
credentials aws_access_key_id="S3Accesskey";aws_secret_access_key="S3Secretkey"
delimiter '|' ACCEPTINVCHARS iGNOREHEADER 1;
Step 2: Run the below query to perform incremental load similar to UPSERT/MERGE statement
QUERY 2
-- Start a new transaction
BEGIN TRANSACTION;
-- Update the target table using an inner join with the staging table
-- The join includes a redundant predicate to collocate on the distribution key
-- A filter on saletime enables a range-restricted scan on SALES
UPDATE ga.logtracking
SET usertype= stage.logtracking.usertype, sessioncount= stage.logtracking.sessioncount
, dayssincelastsession = stage.logtracking.dayssincelastsession, userdefinedvalue = stage.logtracking.userdefinedvalue
, date = CAST(stage.logtracking.date AS DATETIME)
, userid = stage.logtracking.userid
, users = stage.logtracking.users
, newusers = stage.logtracking.newusers
, percentnewsessions = stage.logtracking.percentnewsessions
, sessionsperuser = stage.logtracking.sessionsperuser
, timeonsite = stage.logtracking.timeonsite
, checksum = stage.logtracking.checksum
FROM stage.logtracking
WHERE ga.logtracking.checksum = stage.logtracking.checksum;
-- Delete matching rows from the staging table
-- using an inner join with the target table
DELETE FROM stage.logtracking
USING ga.logtracking
WHERE ga.logtracking.checksum = stage.logtracking.checksum;
-- Insert the remaining rows from the staging table into the target table
INSERT INTO ga.logtracking
SELECT usertype, sessioncount, dayssincelastsession, userdefinedvalue, CAST(date AS DATETIME), userid
, users, newusers, percentnewsessions, sessionsperuser, timeonsite, checksum
FROM stage.logtracking;
-- End transaction and commit
END TRANSACTION;