Bulk Insert in AEP Accelerator Table created via Query Service
We are trying to load a table that we created in Query Service for adhoc Dashboard. To load this table containing aggregate stats we are pulling the data from system datasets like ajo_entity_datasets, journey_step_events or Adobe API and then loading the data from Dataframe which hold aggregate data into Postgres SQL.
Modules used as far:
psycopg2, sqlalchemy to connect and load to the Dataset
pandas for transforming data to dataframe
csv for reading csv file
StringIO for converting csv as String
Methods tried so far:
1.executevalues from psycopg2(reading from CSV file)
2.df.tosql() from sqlalchemy((reading from Dataframe)
3.copy_expert from psycopg2(reading from CSV file)
4.copy_from from psycopg2(reading from CSV file)
5.executemany from psycopg2(reading from Dataframe and from CSV)
1.executevalues from psycopg2(reading from CSV file)
Code:
import os
import psycopg2
import numpy as np
import psycopg2.extras as extras
from io import StringIO
import pandas as pd
conn = psycopg2.connect(database="", sslmode="require",user="", password="",host="",port=)
df = pd.read_csv("test1.csv")
# Create a list of tuples from the dataframe values
tuples = [tuple(x) for x in df.to_numpy()]
# Comma-separated dataframe columns
#print(tuples)
et=[]
#rows = list(zip(df.segment_id, df.created_by))
cols = ','.join(list(df.columns))
#print(rows)
print(tuples)
# SQL quert to execute
query = f"INSERT INTO pp_db.data_ops.test_data (segment_id) VALUES (%s) "
cursor = conn.cursor()
print(query)
try:
print(query)
extras.execute_values(cursor, query,tuples)
conn.commit()
except (Exception, psycopg2.DatabaseError) as error:
print("Error: %s" % error)
conn.rollback()
cursor.close()
print("execute_values() done")
cursor.close()
Error Received with the above code:
Error: ErrorCode: 08P01 queryId: 0934a139-c137-445d-ae1e-f9f830acc818 Unknown error encountered. Reason: [(ROW ('ags')) (of class com.facebook.presto.sql.tree.Values)]
2.df.tosql() from sqlalchemy(reading from Dataframe)
Code:
import pandas as pd
from sqlalchemy import create_engine
import time
import csv
from io import StringIO
from sqlalchemy import Table, MetaData
from sqlalchemy import inspect
df = pd.read_csv("test1.csv") # read csv file from your local
# Example: 'postgresql://username:password@localhost:5432/your_database'
db_string = "postgresql://{user}:{password}@{host}:{port}/{dbname}".format(user='',password="",host='',port = , dbname = ""
)
engine = create_engine(db_string)
metadata=MetaData()
table = Table('test_data', metadata,
autoload_with=engine)
print(table)
#print(engine)
start_time = time.time() # get start time before insert
df1 = pd.DataFrame({'test_id' : ['User4', 'User5']})
df1.to_sql(name='test_data', con=engine, if_exists='append',index=False,method="multi")
df.to_sql(
name='test_data', # table name
con=engine, # engine
if_exists="append", # If the table already exists, append
index=False # no index
)
'''
end_time = time.time() # get end time after insert
total_time = end_time - start_time # calculate the time
print(f"Insert time: {total_time} seconds") # print time
'''
Error Received:
OperationalError: (psycopg2.errors.SystemError) ErrorCode: 08P01 queryId: 9d610e79-7d3c-4c80-81e4-caaace3042b1 Unknown error encountered. Reason: [('User4', 'User5') (of class com.facebook.presto.sql.tree.Values)]
[SQL: INSERT INTO test_data (segment_id) VALUES (%(segment_id_m0)s), (%(segment_id_m1)s)]
[parameters: {'segment_id_m0': 'User4', 'segment_id_m1': 'User5'}]
3.copy_expert from psycopg2(reading from CSV file)
Code:
import psycopg2
from psycopg2 import extras
from datetime import datetime
import csv
from io import StringIO
# Establish connection to PostgreSQL
connection = psycopg2.connect(database="",user="",password="", host="", port=80)
cursor = connection.cursor()
# Set the search path to your schema (if necessary)
cursor.execute("SET search_path TO data_ops;")
# Prepare data from CSV
with open('Email_Stats.csv', mode ='r')as file:
csv_data = csv.reader(file)
my_data = []
batch_size = 100
batch_count = 0
# Skip the header if present
next(csv_data)
for row in csv_data:
# Parse date
rec_dt = datetime.strptime(row[0], '%Y-%m-%d')
# Append to my_data as a tuple
my_data.append((
rec_dt, # parsed date
row[1], # journey_name
row[2], # journey_action_name
row[3], # sent
row[4], # bounce
row[5], # delay
row[6], # exclude
row[7], # error
row[8], # open_tracked
row[9], # click_tracked
row[10], # click_opt_out
row[11], # unsubscribe
row[12], # unsubscribe_opt_out
row[13] # spam_complaint
))
batch_count += 1
if batch_count >= batch_size:
# Use StringIO to simulate a file in memory
sio = StringIO()
csv_writer = csv.writer(sio)
csv_writer.writerows(my_data)
sio.seek(0)
# Use COPY command explicitly with copy_expert
copy_query = """
COPY pp_db.data_ops.email_name_stats (rec_dt, journey_name, journey_action_name, sent, bounce, delay, exclude, error,
open_tracked, click_tracked, click_opt_out, unsubscribe, unsubscribe_opt_out, spam_complaint)
FROM STDIN WITH CSV HEADER DELIMITER ',' NULL AS 'NULL'
"""
cursor.copy_expert(sql=copy_query, file=sio)
connection.commit()
# Reset batch data
my_data = []
batch_count = 0
# Insert any remaining rows (in case the last batch is smaller than the batch_size)
if my_data:
sio = StringIO()
csv_writer = csv.writer(sio)
csv_writer.writerows(my_data)
sio.seek(0)
cursor.copy_expert(sql=copy_query, file=sio)
connection.commit()
# Close cursor and connection
cursor.close()
connection.close()
print("Data insertion completed.")
Error Received: We are receiving below error when we try to give fully qualified name or we are just giving table name or just schemaname.tablename
SyntaxError: ErrorCode: 42601 queryId: 93e56a75-790c-4221-9972-640291fe441b Syntax error encountered. Reason: [line 1:6: mismatched input 'pp_db' expecting {'(', 'PREDICT', 'SELECT', 'TABLE', 'VALUES', 'WITH'}]
4.copy_from from psycopg2(reading from CSV file)
Code:
# COPY from Not working
import psycopg2
from psycopg2 import extras
import csv
from datetime import datetime
from io import StringIO
# Establish connection to PostgreSQL
connection = psycopg2.connect(database="uat:pp_db",user="5CE4123F5245B06C0A490D45@AdobeOrg",password=" ", host=" ", port=80)
cursor = connection.cursor()
# Set the schema search path (if needed)
cursor.execute("SET search_path TO data_ops;")
print(len(df_m))
df_updated=df_m.head(500)
df_updated.to_csv("Email_Stats.csv", header=False, index=False)
df_updated=df_m.head(10)
#print(df_updated)
now = datetime.now()
current_time = now.strftime("%H:%M:%S")
print("Current Time =", current_time)
with open('Email_Stats.csv', mode ='r')as file:
csv_data = csv.reader(file)
batch_size = 100
batch_count = 0
my_data = []
for row in csv_data:
rec_dt = datetime.strptime(row[0],'%Y-%m-%d')
my_data.append((
rec_dt, # parsed date
row[1], # journey_name
row[2], # journey_action_name
row[3], # sent
row[4], # bounce
row[5], # delay
row[6], # exclude
row[7], # error
row[8], # open_tracked
row[9], # click_tracked
row[10], # click_opt_out
row[11], # unsubscribe
row[12], # unsubscribe_opt_out
row[13] # spam_complaint
))
batch_count += 1
if batch_count >= batch_size:
# Use StringIO to simulate a file in memory
sio = StringIO()
csv_writer = csv.writer(sio)
csv_writer.writerows(my_data)
sio.seek(0)
# Insert the batch using copy_from with the file-like object
cursor.copy_from(sio, 'email_name_stats', sep=',')
connection.commit()
# Reset batch data
my_data = []
batch_count = 0
cursor.copy_from(my_data, 'email_name_stats', sep=',', size=50)
now = datetime.now()
current_time = now.strftime("%H:%M:%S")
print("Current Time =", current_time)
Error Received:
SyntaxError: ErrorCode: 42601 queryId: 305b3ce2-e6f9-4560-9e0e-c24ca41da076 Syntax error encountered. Reason: [line 1:6: mismatched input '"email_name_stats"' expecting {'(', 'PREDICT', 'SELECT', 'TABLE', 'VALUES', 'WITH'}]
5.executemany from psycopg2(reading from Dataframe and from CSV)
Code:
#Updated Code with Execute_Many Function in Python less volume
import psycopg2
from psycopg2 import extras
import csv
from datetime import datetime
print(len(df_m))
df_updated=df_m
df_updated.to_csv("Email_Stats.csv", header=False, index=False)
df_updated=df_m.head(10)
#print(df_updated)
# Establish connection to PostgreSQL
connection = psycopg2.connect(database=" ",user=”", password=" ", host=" ", port=80)
cursor = connection.cursor()
with open('Email_Stats.csv', mode ='r')as file:
csv_data = csv.reader(file)
my_data = []
for row in csv_data:
rec_dt = datetime.strptime(row[0],'%Y-%m-%d')
my_data.append((
rec_dt, # parsed date
row[1], # journey_name
row[2], # journey_action_name
row[3], # sent
row[4], # bounce
row[5], # delay
row[6], # exclude
row[7], # error
row[8], # open_tracked
row[9], # click_tracked
row[10], # click_opt_out
row[11], # unsubscribe
row[12], # unsubscribe_opt_out
row[13] # spam_complaint
))
print(len(my_data))
#rows = list(zip(df_updated.rec_dt, df_updated.journey_name,df_updated.journey_action_name,df_updated.sent,df_updated.bounce,df_updated.delay,df_updated.exclude,df_updated.error,df_updated.open_tracked,df_updated.click_tracked,df_updated.click_opt_out,df_updated.unsubscribe,df_updated.unsubscribe_opt_out,df_updated.spam_complaint))
#print(rows)
# Use a parameterized query to safely insert data
query = '''INSERT INTO pp_db.data_ops.email_name_stats(rec_dt, journey_name, journey_action_name,
sent, bounce, delay, exclude, error,
open_tracked, click_tracked, click_opt_out, unsubscribe, unsubscribe_opt_out, spam_complaint)
select
(%s) as date,
(%s) as journey_name,
(%s) as journey_action_name,
(%s) as sent,
(%s) as bounce,
(%s) as delay,
(%s) as exclude,
(%s) as error,
(%s) as open_tracked,
(%s) as click_tracked,
(%s) as click_opt_out,
(%s) as unsubscribe,
(%s) as unsubscribe_opt_out,
(%s) as spam_complaint
'''
now = datetime.now()
current_time = now.strftime("%H:%M:%S")
print("Current Time =", current_time)
# Execute the query with the seg_id passed as a parameter
#cursor.executemany(query, rows)
#cursor.executemany(query, rows) //Commented for testing executemany after reading from a file
cursor.executemany(query, my_data). //Reading from File
#cursor.execute_bulk(query, my_data)
# Commit the transaction if necessary
#connection.commit()
now = datetime.now()
current_time = now.strftime("%H:%M:%S")
print("Current Time =", current_time)
Error Received:
Not throwing any error but taking longer time even for 800 records taking 25+ mins
Time Taken:
25+ mins