ETL - Using Python to push data to Salesforce Analytics

The problem: develop a method to automate the process of pushing data from our Netezza reporting warehouse to Salesforce Analytics for consolidated reporting.

In my quest to solve this problem, I created the Python script below.

The commented yum & pip installs at the top of the script are required bits to make it work.

Sharing the data is outside the scope of this script, but it is easily solved by first creating a data set and sharing it out then using the script to overwrite it.


# sudo yum install python-devel
# sudo pip install ibm_db
# sudo pip install simple_salesforce
# sudo pip install jaydebeapi

from simple_salesforce import Salesforce
import requests
import jaydebeapi
import sys
import csv
import base64
import os
import tempfile
import math

# set up some variables
debug = True
fname = "./data/temp.csv"
errlog = "./log/errorlog.txt"
metadata = "./metadata/SfdcMedalliaMetadata.json"
driver = "./drivers/nzjdbc.jar"

# Salesforce conn info
sf_grant_type = "password"
sf_client_id = "mylongstring"
sf_client_secret = "mysecret"
sf_username = "myusername"
sf_password = "mypassword"
sf_url = "https://test.salesforce.com"

# Database conn info
dsn_database = "mydatabase"
dsn_hostname = "myhost.ibm.com"
dsn_port = "2222"
dsn_uid = "myuid"
dsn_pwd = "mypass"
jdbc_driver_name = "org.netezza.Driver"
jdbc_driver_loc = driver
###jdbc:netezza://',server + "/',dbName ;
connection_string = 'jdbc:netezza://'+dsn_hostname+':'+dsn_port+'/'+dsn_database
url = '{0}:user={1};password={2}'.format(connection_string, dsn_uid, dsn_pwd)

if debug: print 'URL: ', format(url)
if debug: print 'Connection String: ', format(connection_string)

# sql
sql = ("select promoter_type, survey_response_date, survey_id, likelihood_to_recommend, support_satisfaction, ticket_num from MEDALLIA.DRILL_SURVEY")

#connect
try:
    conn = jaydebeapi.connect("org.netezza.Driver", connection_string, {'user': dsn_uid, 'password': dsn_pwd}, jars=driver)
    curs = conn.cursor()
except jaydebeapi.Error as e:
    if debug: print 'Error in connecting to the database.'
    with open(errlog, "wb") as errlg:
        errlg.writelines("Error in connecting to the database.")
        errlg.writelines(e)
        errlg.flush()
        exit()

#----------------------------
# Split function found here: https://stackoverflow.com/questions/30947682/splitting-a-csv-file-into-equal-parts
#----------------------------
def split(infilename, num_chunks):
    READ_BUFFER = 2**13
    in_file_size = os.path.getsize(infilename)
    if debug: print 'SPLIT() in_file_size:', in_file_size
    chunk_size = in_file_size // num_chunks
    if debug: print 'SPLIT(): target chunk_size:', chunk_size
    files = []
    with open(infilename, 'rb', READ_BUFFER) as infile:
        for _ in xrange(num_chunks):
            temp_file = tempfile.TemporaryFile()
            while temp_file.tell() < chunk_size:
                try:
                    temp_file.write(infile.next())
                except StopIteration:  # end of infile
                    break
            temp_file.seek(0)  # rewind
            files.append(temp_file)
    return files

# fetch the data
try:
    curs.execute(sql)
    result = curs.fetchall()
    # get the column names for writing the csv header
    col_names = [i[0] for i in curs.description]
    if debug: print 'Total records fetched : ',str(len(result))
except jaydebeapi.Error as e:
    if debug: print 'Error in fetching database results.'
    with open(errlog, "wb") as errlg:
        errlg.write("Error in fetching database results.")
        errlg.write(e)
        errlg.flush()
        exit()
finally:
    # clean up the connection
    curs.close()
    conn.close()


# write results to csv
with open(fname, "wb") as myfile:
    wrtr = csv.writer(myfile, quoting=csv.QUOTE_ALL)
    # write the csv header
    wrtr.writerow([g for g in col_names])
    for row in result:
        wrtr.writerow(row)
        myfile.flush()
  

# set parameters for SFDC login
params = {
    "grant_type": sf_grant_type,
    "client_id": sf_client_id,
    "client_secret": sf_client_secret,
    "username": sf_username,
    "password": sf_password
}

try:
    # make the request and get the access_token and instance_url for future posts
    r = requests.post(
        sf_url + "/services/oauth2/token", params=params)
    # store the tocken and instance url
    access_token = r.json().get("access_token")
    instanceUrl = r.json().get("instance_url")
    if debug: print 'Login to Salesforce : access token is ',str(access_token)
except Exception as e:
    if debug: print 'Error in logging into Salesforce.'
    with open(errlog, "wb") as errlg:
        errlg.write("Error posting Auth request to Salesforce.")
        errlg.write(e)
        errlg.flush()
        exit()


# instantiate the sf object for easy crud operations
sf = Salesforce(instance_url=instanceUrl, session_id=access_token)

# set up the data header, by including the data description
with open(metadata, "r") as mdata:
        mdata_contents = base64.b64encode(str(mdata.read()))

# insert the header record
try:
    res_header = sf.InsightsExternalData.create({
        'Format': 'Csv',
        'EdgemartAlias': 'MedalliaSurveys',
        'EdgemartLabel': 'Medallia Surveys',
        'Description': 'Test of API load.',
        'FileName': 'MedalliaSurveys',
        'MetadataJson': mdata_contents,
        'Operation': 'Overwrite',
        'Action': 'None'
    })
    # retrieve the new header id for use with the data parts
    header_id = res_header.get('id')
    if debug: print 'Created data header. Id is ',str(header_id)
except Exception as e:
    if debug: print 'Error in writing data header.'
    with open(errlog, "wb") as errlg:
        errlg.write("Error writing data header to Salesforce.")
        errlg.write(str(e))
        errlg.flush()
        exit()

# if the file is larger than 10mb,
# it needs to be broken up in chunks
fsize = os.stat(fname).st_size

try:
    if (fsize > 10000000):
        if debug: print 'File needs to be chunked, size is : ',str(fsize)
        num_chunks = int(math.ceil(float(fsize) / float(10000000)))
        files = split(fname, num_chunks)
        if debug: print 'Number of files created: ', format(len(files))
        for i, ifile in enumerate(files, start=1):
            f_contents = base64.b64encode(str(ifile.read()))
            res_data = sf.InsightsExternalDataPart.create({
                    'DataFile': f_contents,
                    'InsightsExternalDataId': header_id,
                    'PartNumber': str(i)
                })
    else:
        if debug: print 'File is fine to post in single part.'
        # base64 encode the data file
        with open(fname, "r") as f:
            f_contents = base64.b64encode(str(f.read()))
            res_data = sf.InsightsExternalDataPart.create({
                'DataFile': f_contents,
                'InsightsExternalDataId': header_id,
                'PartNumber': '1'
            })
        if debug: print 'The data part created is : ',str(res_data.get('id'))
except Exception as e:
    if debug: print 'Error in writing data part.'
    with open(errlog, "wb") as errlg:
        errlg.write("Error writing data part to Salesforce.")
        errlg.write(str(e))
        errlg.flush()
        exit()


try:
    res_proc = sf.InsightsExternalData.update(header_id, {
    'Action': 'Process'
        })
    if debug: print 'The result of the processing the data is : ',str(res_proc)
except Exception as e:
    if debug: print 'Error in Updating action of data header.' 
    with open(errlog, "wb") as errlg:
        errlg.write("Error processing data in Salesforce.")
        errlg.write(str(e))
        errlg.flush()
        exit()    

Comments

Anonymous said…
Hey John,
Please help me with your email address, I need some help with connecting Transoft USQL to SQL Server or any Database engine.
At the point when you travel to Sharjah, there is certifiably not a solitary possibility that you missing one of its most lovely urban communities. Sharjah is a trendy and modern city, being the world's style capital. It is further known for the glow and neighborliness of the Sharjah Escorts Agency. You will be entranced by the excellence and appeal of Sharjah Escorts Service and Dubai Escorts Services, as no other Indian Escorts in Dubai is that alluring around the spot. Sharjah Escort Service
Pakistani Escorts in Sharjah will love investing their energy with intriguing men of honor who visit the city and solicitation for Pakistani Call Girls In Sharjah. With a straightforward call, you will locate an enchanting and rich Kashmiri Escorts In Dubai ally for a supper. She can even be your manual for the city. Out of Malayali Escorts In Sharjah - Indian Call Girls in Sharjah, Turkish Escorts in Sharjah, or Russian Escorts in Sharjah, Call Girl in Sharjah administration can offer you the most wonderful and energetic experience in another city with life-changing recollections of your stay here.

Popular posts from this blog

freeFTPD

Traveling via Amtrak

DotNetNuke SEO