Accessing Data Sources by Python¶
This section describes how to access HDFS, Hive, Kafka, S3, and Blob data sources by Python.
Success and Failure Judgement¶
Define a main
function that contains the return value:
def main():
print(1)
return 0
The following example shows a successful task, returning 0:
def main():
print(‘The task runs successfully’)
return 0
Task failure can be the following situations:
Return value is not 0.
The program throws an exception, which causes the program to fail to run.
For example:
def main():
print(‘The task fails’)
return 'fail'
def main():
print(‘The task fails’)
return -1
def main():
print(‘The task fails’)
raise Exception(‘raise exeception, let task fail’)
HDFS¶
Use the following methods to access HDFS by Python:
import hdfs
def main():
#List files under the root directory of HDFS of the current OU.
res = hdfs.listDir('/')
if res == 0:
return 0
else:
return -1
import hdfs
def main():
#Upload a local file to HDFS.
#res = hdfs.appendFile('File path on HDFS','Local file path')
#In the following example, if /test/put1 already exists, append the content of file xia_20200522.txt #to file /test/put1.
res = hdfs.appendFile('/test/put1','/home/envuser/etl/xia_20200522.txt')
if res == 0:
return 0
else:
return -1
import hdfs
def main():
#Append content to a file on HDFS.
#In the following example, if file '/test/put1' does not exist, it will be created automatically.
#By default, the appended content does not include line feeds. Otherwise, add line feeds to the file #header.
#res = hdfs.appendContent('File path','Appended content')
res = hdfs.appendContent('/test/put1','add some content')
if res == 0:
return 0
else:
return -1
import hdfs
def main():
#Upload a local file /home/envuser/etl/put3 to the /user/db_hongtao_hao/test directory on HDFS.
#res = hdfs.put('File path on HDFS','Local file path')
hdfs.put('/test','/home/envuser/etl/put3')
if res == 0:
return 0
else:
return -1
import hdfs
def main():
#Create a directory.
res = hdfs.mkdir('/test')
if res == 0:
return 0
else:
return -1
Hive¶
Use the following methods to access Hive by Python:
import hive
def main():
#Run an execute statement, returning 0 upon success.
#The execute statement can be used to run statements such as insert, create, load, and drop.
rs = hive.execute('''load data inpath '/testfile/emp.txt' into table emp''')
if res == 0:
return 0
else:
return -1
import hive
def main():
#Run an executeUpdate statement, returning the number of impacted lines.
#The executeUpdate statement can be used to run statements such as insert, update, and delete.
#Returning a value less than 0 upon failure. Returning 0 or a value greater than 0 upon success.
#Returning the number of imapcted lines, but needs to be determined by the SQL statements.
rs = hive.executeUpdate('''insert into emp values('a','b')''')
if res >= 0:
return 0
else:
return -1
import hive
def main():
#Run an executeQuery statement, returning ResultSet.
#This statement can be used for query statements.
rs = hive.executeQuery('select count(*) from emp')
while rs.next():
print(rs.getInt(1))
return 0
Kafka¶
Use the following methods to access Kafka by Python:
from Msg import MsgBuilder,MeasurepointBuilder
import batchTSDBWriter
def main():
#Build measurement point data. If timestamp is not provided, the current timestamp will be used as #the default value.
mp1 = MeasurepointBuilder.builder().add_measurepoint("MeasurePoint11", 100).add_measurepoint("MeasurePoint12", "aa");
mp2 = MeasurepointBuilder.builder().add_measurepoint("MeasurePoint21", 100).add_measurepoint("MeasurePoint22", "aa").set_timestamp(1542609276270);
#Provide the OU ID, model ID, and asset ID.
#Use set_modelIdPath to provide the model ID path.
#Use add_payload to upload the measurement point data.
#Use set_dq to initialize data quality.
#Use add_dq to append data quality.
msg = MsgBuilder.builder("1b47ed98d1800000","inverter","zabPDuHq")set_modelIdPath("/").add_payload(mp1).add_payload(mp2)
#str(msg) is the message to be sent to Kafka.
#The first parameter is of Boolean type. When False is specified, measurepoints will not be validated.
#The second parameter is of Boolean type. When False is specified, assetId will not be validated.
#The length of message must not exceed 3000 bytes.
res = batchTSDBWriter.send_data(str(msg),False,False)
if res == 0:
return 0
else:
return -1
S3¶
Use the following methods to access S3 by Python:
import s3Wrapper
def main():
s3_session = s3Wrapper.conn(S3 data source name in the current OU)
#s3_session can be used for a series of operations.
for bucket in s3_session.buckets.all():
print('bucket name:%s'%bucket.name)
return 0
Blob¶
Use the following methods to access Blob by Python:
import blobWrapper
def main():
container_client = blobWrapper.conn(ID of Blob data source in the current OU)
blob_list = container_client.list_blobs('commonfs')
#blob_list can be used for a series of opertions.
for blob in blob_list:
print("\t" + blob.name)
return 0