Expand my Community achievements bar.

Bulk Insert in AEP Accelerator Table created via Query Service

Avatar

Level 1

We are trying to load a table that we created in Query Service for adhoc Dashboard. To load this table containing aggregate stats we are pulling the data from system datasets like ajo_entity_datasets, journey_step_events or Adobe API and then loading the data from Dataframe which hold aggregate data into Postgres SQL.

 

Modules used as far:

psycopg2, sqlalchemy to connect and load to the Dataset

pandas for transforming data to dataframe

csv for reading csv file

StringIO for converting csv as String

 

Methods tried so far:

1.executevalues from psycopg2(reading from CSV file)

2.df.tosql() from sqlalchemy((reading from Dataframe)

3.copy_expert from psycopg2(reading from CSV file)

4.copy_from from psycopg2(reading from CSV file)

5.executemany  from psycopg2(reading from Dataframe and from CSV)

 

1.executevalues from psycopg2(reading from CSV file)

Code:

import os
import psycopg2
import numpy as np
import psycopg2.extras as extras
from io import StringIO
import pandas as pd

conn = psycopg2.connect(database="", sslmode="require",user="", password="",host="",port=)

df = pd.read_csv("test1.csv")

 

# Create a list of tuples from the dataframe values
tuples = [tuple(x) for x in df.to_numpy()]
# Comma-separated dataframe columns
#print(tuples)
et=[]
#rows = list(zip(df.segment_id, df.created_by))
cols = ','.join(list(df.columns))
#print(rows)
print(tuples)
# SQL quert to execute

 

query  = f"INSERT INTO pp_db.data_ops.test_data (segment_id) VALUES (%s) "
cursor = conn.cursor()
print(query)
try:
    print(query)
    extras.execute_values(cursor, query,tuples)
    conn.commit()
except (Exception, psycopg2.DatabaseError) as error:
    print("Error: %s" % error)
    conn.rollback()
    cursor.close()
print("execute_values() done")
cursor.close()

Error Received with the above code:

Error: ErrorCode: 08P01 queryId: 0934a139-c137-445d-ae1e-f9f830acc818 Unknown error encountered. Reason: [(ROW ('ags')) (of class com.facebook.presto.sql.tree.Values)]

 

 2.df.tosql() from sqlalchemy(reading from Dataframe)

 

Code:

import pandas as pd

from sqlalchemy import create_engine

import time

import csv

from io import StringIO

from sqlalchemy import Table, MetaData

from sqlalchemy import inspect

 

df = pd.read_csv("test1.csv") # read csv file from your local

 

# Example: 'postgresql://username:password@localhost:5432/your_database'

db_string = "postgresql://{user}:{password}@{host}:{port}/{dbname}".format(user='',password="",host='',port = , dbname = ""

)

 

engine = create_engine(db_string)

 

metadata=MetaData()

table = Table('test_data', metadata,

              autoload_with=engine)

print(table)

 

 

 

#print(engine)

start_time = time.time() # get start time before insert

df1 = pd.DataFrame({'test_id' : ['User4', 'User5']})

df1.to_sql(name='test_data', con=engine, if_exists='append',index=False,method="multi")

 

df.to_sql(

    name='test_data', # table name

    con=engine,  # engine

    if_exists="append", #  If the table already exists, append

    index=False # no index

)

'''

end_time = time.time() # get end time after insert

total_time = end_time - start_time # calculate the time

print(f"Insert time: {total_time} seconds") # print time

'''

Error Received:

OperationalError: (psycopg2.errors.SystemError) ErrorCode: 08P01 queryId: 9d610e79-7d3c-4c80-81e4-caaace3042b1 Unknown error encountered. Reason: [('User4', 'User5') (of class com.facebook.presto.sql.tree.Values)]

[SQL: INSERT INTO test_data (segment_id) VALUES (%(segment_id_m0)s), (%(segment_id_m1)s)]
[parameters: {'segment_id_m0': 'User4', 'segment_id_m1': 'User5'}]

 

3.copy_expert from psycopg2(reading from CSV file)

 

Code:

import psycopg2
from psycopg2 import extras
from datetime import datetime
import csv
from io import StringIO

# Establish connection to PostgreSQL
connection = psycopg2.connect(database="",user="",password="", host="", port=80)

cursor = connection.cursor()

# Set the search path to your schema (if necessary)
cursor.execute("SET search_path TO data_ops;")

# Prepare data from CSV
with open('Email_Stats.csv', mode ='r')as file:
csv_data = csv.reader(file)
my_data = []
batch_size = 100
batch_count = 0

# Skip the header if present
next(csv_data)

for row in csv_data:
# Parse date
rec_dt = datetime.strptime(row[0], '%Y-%m-%d')

# Append to my_data as a tuple
my_data.append((
rec_dt, # parsed date
row[1], # journey_name
row[2], # journey_action_name
row[3], # sent
row[4], # bounce
row[5], # delay
row[6], # exclude
row[7], # error
row[8], # open_tracked
row[9], # click_tracked
row[10], # click_opt_out
row[11], # unsubscribe
row[12], # unsubscribe_opt_out
row[13] # spam_complaint
))

batch_count += 1

if batch_count >= batch_size:
# Use StringIO to simulate a file in memory
sio = StringIO()
csv_writer = csv.writer(sio)
csv_writer.writerows(my_data)
sio.seek(0)

# Use COPY command explicitly with copy_expert
copy_query = """
COPY pp_db.data_ops.email_name_stats (rec_dt, journey_name, journey_action_name, sent, bounce, delay, exclude, error,
open_tracked, click_tracked, click_opt_out, unsubscribe, unsubscribe_opt_out, spam_complaint)
FROM STDIN WITH CSV HEADER DELIMITER ',' NULL AS 'NULL'
"""

cursor.copy_expert(sql=copy_query, file=sio)
connection.commit()

# Reset batch data
my_data = []
batch_count = 0

# Insert any remaining rows (in case the last batch is smaller than the batch_size)
if my_data:
sio = StringIO()
csv_writer = csv.writer(sio)
csv_writer.writerows(my_data)
sio.seek(0)

cursor.copy_expert(sql=copy_query, file=sio)
connection.commit()

# Close cursor and connection
cursor.close()
connection.close()

print("Data insertion completed.")

 

Error Received:  We are receiving below error when we try to give fully qualified name or we are just giving table name or just schemaname.tablename

SyntaxError: ErrorCode: 42601 queryId: 93e56a75-790c-4221-9972-640291fe441b Syntax error encountered. Reason: [line 1:6: mismatched input 'pp_db' expecting {'(', 'PREDICT', 'SELECT', 'TABLE', 'VALUES', 'WITH'}]

 

4.copy_from from psycopg2(reading from CSV file)

Code:

 

# COPY from Not working

 

import psycopg2

from psycopg2 import extras

import csv

from datetime import datetime

from io import StringIO

# Establish connection to PostgreSQL

connection = psycopg2.connect(database="uat:pp_db",user="5CE4123F5245B06C0A490D45@AdobeOrg",password=" ", host=" ", port=80)

cursor = connection.cursor()

# Set the schema search path (if needed)

cursor.execute("SET search_path TO data_ops;")

print(len(df_m))

df_updated=df_m.head(500)

df_updated.to_csv("Email_Stats.csv", header=False, index=False)

 

df_updated=df_m.head(10)

#print(df_updated)

now = datetime.now()

current_time = now.strftime("%H:%M:%S")

print("Current Time =", current_time)

 

with open('Email_Stats.csv', mode ='r')as file:

    csv_data = csv.reader(file)

    batch_size = 100

    batch_count = 0

    my_data = []

    for row in csv_data:

 

        rec_dt = datetime.strptime(row[0],'%Y-%m-%d')

        my_data.append((

            rec_dt,  # parsed date

            row[1],  # journey_name

            row[2],  # journey_action_name

            row[3],  # sent

            row[4],  # bounce

            row[5],  # delay

            row[6],  # exclude

            row[7],  # error

            row[8],  # open_tracked

            row[9],  # click_tracked

            row[10], # click_opt_out

            row[11], # unsubscribe

            row[12], # unsubscribe_opt_out

            row[13]  # spam_complaint

        ))

 

        batch_count += 1

 

 

if batch_count >= batch_size:

            # Use StringIO to simulate a file in memory

            sio = StringIO()

            csv_writer = csv.writer(sio)

            csv_writer.writerows(my_data)

            sio.seek(0)

 

            # Insert the batch using copy_from with the file-like object

            cursor.copy_from(sio, 'email_name_stats', sep=',')

 

            connection.commit()

 

            # Reset batch data

            my_data = []

            batch_count = 0

 

cursor.copy_from(my_data, 'email_name_stats', sep=',', size=50)

 

now = datetime.now()

current_time = now.strftime("%H:%M:%S")

print("Current Time =", current_time)

 

Error Received:

 
SyntaxError: ErrorCode: 42601 queryId: 305b3ce2-e6f9-4560-9e0e-c24ca41da076 Syntax error encountered. Reason: [line 1:6: mismatched input '"email_name_stats"' expecting {'(', 'PREDICT', 'SELECT', 'TABLE', 'VALUES', 'WITH'}]

 

5.executemany from psycopg2(reading from Dataframe and from CSV)

Code:

 #Updated Code with Execute_Many Function in Python less volume

import psycopg2

from psycopg2 import extras

import csv

from datetime import datetime

print(len(df_m))

df_updated=df_m

df_updated.to_csv("Email_Stats.csv", header=False, index=False)

df_updated=df_m.head(10)

#print(df_updated)

# Establish connection to PostgreSQL

connection = psycopg2.connect(database=" ",user=”", password=" ", host=" ", port=80)

cursor = connection.cursor()

with open('Email_Stats.csv', mode ='r')as file:

    csv_data = csv.reader(file)

    my_data = []

    for row in csv_data:

        rec_dt = datetime.strptime(row[0],'%Y-%m-%d')

        my_data.append((

            rec_dt,  # parsed date

            row[1],  # journey_name

            row[2],  # journey_action_name

            row[3],  # sent

            row[4],  # bounce

            row[5],  # delay

            row[6],  # exclude

            row[7],  # error

            row[8],  # open_tracked

            row[9],  # click_tracked

            row[10], # click_opt_out

            row[11], # unsubscribe

            row[12], # unsubscribe_opt_out

            row[13]  # spam_complaint

        ))

 

print(len(my_data))

#rows = list(zip(df_updated.rec_dt, df_updated.journey_name,df_updated.journey_action_name,df_updated.sent,df_updated.bounce,df_updated.delay,df_updated.exclude,df_updated.error,df_updated.open_tracked,df_updated.click_tracked,df_updated.click_opt_out,df_updated.unsubscribe,df_updated.unsubscribe_opt_out,df_updated.spam_complaint))

 

#print(rows)

# Use a parameterized query to safely insert data

query = '''INSERT INTO pp_db.data_ops.email_name_stats(rec_dt, journey_name, journey_action_name,

                    sent, bounce, delay, exclude, error,

                    open_tracked, click_tracked, click_opt_out, unsubscribe, unsubscribe_opt_out, spam_complaint)

                    select

                    (%s) as date,

                    (%s) as journey_name,

                    (%s) as journey_action_name,

                    (%s) as sent,

                    (%s) as bounce,

                    (%s) as delay,

                    (%s) as exclude,

                    (%s) as error,

                    (%s) as open_tracked,

                    (%s) as click_tracked,

                    (%s) as click_opt_out,

                    (%s) as unsubscribe,

                    (%s) as unsubscribe_opt_out,

                    (%s) as spam_complaint

                    '''

 

 

now = datetime.now()

current_time = now.strftime("%H:%M:%S")

print("Current Time =", current_time)

 

# Execute the query with the seg_id passed as a parameter

#cursor.executemany(query, rows)

 

#cursor.executemany(query, rows)  //Commented for testing executemany after reading from a file

 

cursor.executemany(query, my_data). //Reading from File

 

#cursor.execute_bulk(query, my_data)

# Commit the transaction if necessary

#connection.commit()

 

now = datetime.now()

current_time = now.strftime("%H:%M:%S")

print("Current Time =", current_time)

 

Error Received:

Not throwing any error but taking longer time even for 800 records taking 25+ mins 

Time Taken:

 25+ mins

 

 

 

Topics

Topics help categorize Community content and increase your ability to discover relevant content.

3 Replies

Avatar

Level 4

Hi @ayminocha ,

I can understand at times query service error are not really helpful and it might feel frustrating (I can say from first hand experience).

 

First of all, by looking at the error only without knowing your target dataset structure (pp_db.data_ops.test_data schema) and your input data it is not possible to find much insight on why most of your efforts failed.

I have two suggestions:

  1. When you are running any query from Python into AEP, you can see the same error log in the 'Log' tab in the AEP environment. Though the same error will show up in the screen, but you will know which query failed. And may be that can help you to debug that query.
  2. Instead of inserting the data via query, it might be helpful if you can create a schema-dataset pair (dummy) to bring your csv data into AEP. Once the data is in, you could use query service from there is get the data into your target dataset

 

-- above are mere suggestion and I do not have much insight on why most of your ingestion failed and why the last one is taking too long time to execute.

Avatar

Administrator

@dhanesh04s @_Andy_ @SSampsa @itsme_ssj @dwright-adobe @John_Man @abhinavbalooni @SubeAEP @MeitMedia @Jennifer_Andrews If you have a moment, please review this question and see if you can help. Thanks in advance!



Kautuk Sahni

Avatar

Administrator

@ayminocha Did you find the suggestion helpful? Please let us know if you need more information. If a response worked, kindly mark it as correct for posterity; alternatively, if you found a solution yourself, we’d appreciate it if you could share it with the community. Thank you!



Kautuk Sahni