We are trying to load a table that we created in Query Service for adhoc Dashboard. To load this table containing aggregate stats we are pulling the data from system datasets like ajo_entity_datasets, journey_step_events or Adobe API and then loading the data from Dataframe which hold aggregate data into Postgres SQL.
Modules used as far:
psycopg2, sqlalchemy to connect and load to the Dataset
pandas for transforming data to dataframe
csv for reading csv file
StringIO for converting csv as String
Methods tried so far:
1.executevalues from psycopg2(reading from CSV file)
2.df.tosql() from sqlalchemy((reading from Dataframe)
3.copy_expert from psycopg2(reading from CSV file)
4.copy_from from psycopg2(reading from CSV file)
5.executemany from psycopg2(reading from Dataframe and from CSV)
1.executevalues from psycopg2(reading from CSV file)
Code:
import os
import psycopg2
import numpy as np
import psycopg2.extras as extras
from io import StringIO
import pandas as pd
conn = psycopg2.connect(database="", sslmode="require",user="", password="",host="",port=)
df = pd.read_csv("test1.csv")
# Create a list of tuples from the dataframe values
tuples = [tuple(x) for x in df.to_numpy()]
# Comma-separated dataframe columns
#print(tuples)
et=[]
#rows = list(zip(df.segment_id, df.created_by))
cols = ','.join(list(df.columns))
#print(rows)
print(tuples)
# SQL quert to execute
query = f"INSERT INTO pp_db.data_ops.test_data (segment_id) VALUES (%s) "
cursor = conn.cursor()
print(query)
try:
print(query)
extras.execute_values(cursor, query,tuples)
conn.commit()
except (Exception, psycopg2.DatabaseError) as error:
print("Error: %s" % error)
conn.rollback()
cursor.close()
print("execute_values() done")
cursor.close()
Error Received with the above code:
Error: ErrorCode: 08P01 queryId: 0934a139-c137-445d-ae1e-f9f830acc818 Unknown error encountered. Reason: [(ROW ('ags')) (of class com.facebook.presto.sql.tree.Values)]
2.df.tosql() from sqlalchemy(reading from Dataframe)
Code:
import pandas as pd
from sqlalchemy import create_engine
import time
import csv
from io import StringIO
from sqlalchemy import Table, MetaData
from sqlalchemy import inspect
df = pd.read_csv("test1.csv") # read csv file from your local
# Example: 'postgresql://username:password@localhost:5432/your_database'
db_string = "postgresql://{user}:{password}@{host}:{port}/{dbname}".format(user='',password="",host='',port = , dbname = ""
)
engine = create_engine(db_string)
metadata=MetaData()
table = Table('test_data', metadata,
autoload_with=engine)
print(table)
#print(engine)
start_time = time.time() # get start time before insert
df1 = pd.DataFrame({'test_id' : ['User4', 'User5']})
df1.to_sql(name='test_data', con=engine, if_exists='append',index=False,method="multi")
df.to_sql(
name='test_data', # table name
con=engine, # engine
if_exists="append", # If the table already exists, append
index=False # no index
)
'''
end_time = time.time() # get end time after insert
total_time = end_time - start_time # calculate the time
print(f"Insert time: {total_time} seconds") # print time
'''
Error Received:
OperationalError: (psycopg2.errors.SystemError) ErrorCode: 08P01 queryId: 9d610e79-7d3c-4c80-81e4-caaace3042b1 Unknown error encountered. Reason: [('User4', 'User5') (of class com.facebook.presto.sql.tree.Values)]
[SQL: INSERT INTO test_data (segment_id) VALUES (%(segment_id_m0)s), (%(segment_id_m1)s)]
[parameters: {'segment_id_m0': 'User4', 'segment_id_m1': 'User5'}]
3.copy_expert from psycopg2(reading from CSV file)
Code:
import psycopg2
from psycopg2 import extras
from datetime import datetime
import csv
from io import StringIO
# Establish connection to PostgreSQL
connection = psycopg2.connect(database="",user="",password="", host="", port=80)
cursor = connection.cursor()
# Set the search path to your schema (if necessary)
cursor.execute("SET search_path TO data_ops;")
# Prepare data from CSV
with open('Email_Stats.csv', mode ='r')as file:
csv_data = csv.reader(file)
my_data = []
batch_size = 100
batch_count = 0
# Skip the header if present
next(csv_data)
for row in csv_data:
# Parse date
rec_dt = datetime.strptime(row[0], '%Y-%m-%d')
# Append to my_data as a tuple
my_data.append((
rec_dt, # parsed date
row[1], # journey_name
row[2], # journey_action_name
row[3], # sent
row[4], # bounce
row[5], # delay
row[6], # exclude
row[7], # error
row[8], # open_tracked
row[9], # click_tracked
row[10], # click_opt_out
row[11], # unsubscribe
row[12], # unsubscribe_opt_out
row[13] # spam_complaint
))
batch_count += 1
if batch_count >= batch_size:
# Use StringIO to simulate a file in memory
sio = StringIO()
csv_writer = csv.writer(sio)
csv_writer.writerows(my_data)
sio.seek(0)
# Use COPY command explicitly with copy_expert
copy_query = """
COPY pp_db.data_ops.email_name_stats (rec_dt, journey_name, journey_action_name, sent, bounce, delay, exclude, error,
open_tracked, click_tracked, click_opt_out, unsubscribe, unsubscribe_opt_out, spam_complaint)
FROM STDIN WITH CSV HEADER DELIMITER ',' NULL AS 'NULL'
"""
cursor.copy_expert(sql=copy_query, file=sio)
connection.commit()
# Reset batch data
my_data = []
batch_count = 0
# Insert any remaining rows (in case the last batch is smaller than the batch_size)
if my_data:
sio = StringIO()
csv_writer = csv.writer(sio)
csv_writer.writerows(my_data)
sio.seek(0)
cursor.copy_expert(sql=copy_query, file=sio)
connection.commit()
# Close cursor and connection
cursor.close()
connection.close()
print("Data insertion completed.")
Error Received: We are receiving below error when we try to give fully qualified name or we are just giving table name or just schemaname.tablename
SyntaxError: ErrorCode: 42601 queryId: 93e56a75-790c-4221-9972-640291fe441b Syntax error encountered. Reason: [line 1:6: mismatched input 'pp_db' expecting {'(', 'PREDICT', 'SELECT', 'TABLE', 'VALUES', 'WITH'}]
4.copy_from from psycopg2(reading from CSV file)
Code:
# COPY from Not working
import psycopg2
from psycopg2 import extras
import csv
from datetime import datetime
from io import StringIO
# Establish connection to PostgreSQL
connection = psycopg2.connect(database="uat:pp_db",user="5CE4123F5245B06C0A490D45@AdobeOrg",password=" ", host=" ", port=80)
cursor = connection.cursor()
# Set the schema search path (if needed)
cursor.execute("SET search_path TO data_ops;")
print(len(df_m))
df_updated=df_m.head(500)
df_updated.to_csv("Email_Stats.csv", header=False, index=False)
df_updated=df_m.head(10)
#print(df_updated)
now = datetime.now()
current_time = now.strftime("%H:%M:%S")
print("Current Time =", current_time)
with open('Email_Stats.csv', mode ='r')as file:
csv_data = csv.reader(file)
batch_size = 100
batch_count = 0
my_data = []
for row in csv_data:
rec_dt = datetime.strptime(row[0],'%Y-%m-%d')
my_data.append((
rec_dt, # parsed date
row[1], # journey_name
row[2], # journey_action_name
row[3], # sent
row[4], # bounce
row[5], # delay
row[6], # exclude
row[7], # error
row[8], # open_tracked
row[9], # click_tracked
row[10], # click_opt_out
row[11], # unsubscribe
row[12], # unsubscribe_opt_out
row[13] # spam_complaint
))
batch_count += 1
if batch_count >= batch_size:
# Use StringIO to simulate a file in memory
sio = StringIO()
csv_writer = csv.writer(sio)
csv_writer.writerows(my_data)
sio.seek(0)
# Insert the batch using copy_from with the file-like object
cursor.copy_from(sio, 'email_name_stats', sep=',')
connection.commit()
# Reset batch data
my_data = []
batch_count = 0
cursor.copy_from(my_data, 'email_name_stats', sep=',', size=50)
now = datetime.now()
current_time = now.strftime("%H:%M:%S")
print("Current Time =", current_time)
Error Received:
SyntaxError: ErrorCode: 42601 queryId: 305b3ce2-e6f9-4560-9e0e-c24ca41da076 Syntax error encountered. Reason: [line 1:6: mismatched input '"email_name_stats"' expecting {'(', 'PREDICT', 'SELECT', 'TABLE', 'VALUES', 'WITH'}]
5.executemany from psycopg2(reading from Dataframe and from CSV)
Code:
#Updated Code with Execute_Many Function in Python less volume
import psycopg2
from psycopg2 import extras
import csv
from datetime import datetime
print(len(df_m))
df_updated=df_m
df_updated.to_csv("Email_Stats.csv", header=False, index=False)
df_updated=df_m.head(10)
#print(df_updated)
# Establish connection to PostgreSQL
connection = psycopg2.connect(database=" ",user=”", password=" ", host=" ", port=80)
cursor = connection.cursor()
with open('Email_Stats.csv', mode ='r')as file:
csv_data = csv.reader(file)
my_data = []
for row in csv_data:
rec_dt = datetime.strptime(row[0],'%Y-%m-%d')
my_data.append((
rec_dt, # parsed date
row[1], # journey_name
row[2], # journey_action_name
row[3], # sent
row[4], # bounce
row[5], # delay
row[6], # exclude
row[7], # error
row[8], # open_tracked
row[9], # click_tracked
row[10], # click_opt_out
row[11], # unsubscribe
row[12], # unsubscribe_opt_out
row[13] # spam_complaint
))
print(len(my_data))
#rows = list(zip(df_updated.rec_dt, df_updated.journey_name,df_updated.journey_action_name,df_updated.sent,df_updated.bounce,df_updated.delay,df_updated.exclude,df_updated.error,df_updated.open_tracked,df_updated.click_tracked,df_updated.click_opt_out,df_updated.unsubscribe,df_updated.unsubscribe_opt_out,df_updated.spam_complaint))
#print(rows)
# Use a parameterized query to safely insert data
query = '''INSERT INTO pp_db.data_ops.email_name_stats(rec_dt, journey_name, journey_action_name,
sent, bounce, delay, exclude, error,
open_tracked, click_tracked, click_opt_out, unsubscribe, unsubscribe_opt_out, spam_complaint)
select
(%s) as date,
(%s) as journey_name,
(%s) as journey_action_name,
(%s) as sent,
(%s) as bounce,
(%s) as delay,
(%s) as exclude,
(%s) as error,
(%s) as open_tracked,
(%s) as click_tracked,
(%s) as click_opt_out,
(%s) as unsubscribe,
(%s) as unsubscribe_opt_out,
(%s) as spam_complaint
'''
now = datetime.now()
current_time = now.strftime("%H:%M:%S")
print("Current Time =", current_time)
# Execute the query with the seg_id passed as a parameter
#cursor.executemany(query, rows)
#cursor.executemany(query, rows) //Commented for testing executemany after reading from a file
cursor.executemany(query, my_data). //Reading from File
#cursor.execute_bulk(query, my_data)
# Commit the transaction if necessary
#connection.commit()
now = datetime.now()
current_time = now.strftime("%H:%M:%S")
print("Current Time =", current_time)
Error Received:
Not throwing any error but taking longer time even for 800 records taking 25+ mins
Time Taken:
25+ mins
Topics help categorize Community content and increase your ability to discover relevant content.
Views
Replies
Total Likes
Hi @ayminocha ,
I can understand at times query service error are not really helpful and it might feel frustrating (I can say from first hand experience).
First of all, by looking at the error only without knowing your target dataset structure (pp_db.data_ops.test_data schema) and your input data it is not possible to find much insight on why most of your efforts failed.
I have two suggestions:
-- above are mere suggestion and I do not have much insight on why most of your ingestion failed and why the last one is taking too long time to execute.
@dhanesh04s @_Andy_ @SSampsa @itsme_ssj @dwright-adobe @John_Man @abhinavbalooni @SubeAEP @MeitMedia @Jennifer_Andrews If you have a moment, please review this question and see if you can help. Thanks in advance!
Views
Replies
Total Likes
@ayminocha Did you find the suggestion helpful? Please let us know if you need more information. If a response worked, kindly mark it as correct for posterity; alternatively, if you found a solution yourself, we’d appreciate it if you could share it with the community. Thank you!
Views
Replies
Total Likes