Python访问各类数据源方法¶
本文描述了如何使用 Python 访问 HDFS、Hive、Kafka、S3、Blob等数据源中的数据。
成功判断¶
必须定义一个 main
函数,并且包含返回值:
def main():
print(1)
return 0
任务成功判断示例,返回 0:
def main():
print(‘The task runs successfully’)
return 0
任务失败包括如下情况:
返回值不是 0
程序抛出异常导致程序无法正常运行
例如:
def main():
print(‘The task fails’)
return 'fail'
def main():
print(‘The task fails’)
return -1
def main():
print(‘The task fails’)
raise Exception(‘raise exeception, let task fail’)
HDFS¶
使用 Python 访问 HDFS 的方法如下:
import hdfs
def main():
#列出当前 OU 的 HDFS 的根目录下的文件
res = hdfs.listDir('/')
if res == 0:
return 0
else:
return -1
import hdfs
def main():
#将本地文件上传到 HDFS 上
#res = hdfs.appendFile('要放到HDFS上目录下的文件路径','容器本地文件的路径')
#如果 /test/put1 已经存在,那么会把 xia_20200522.txt 文件的内容追加到 /test/put1 这个文件中
res = hdfs.appendFile('/test/put1','/home/envuser/etl/xia_20200522.txt')
if res == 0:
return 0
else:
return -1
import hdfs
def main():
#对HDFS文本里追加内容
#如果 '/test/put1' 不存在,会自动创建
#追加的内容默认不换行,换行追加需要在文件头加上换行符
#res = hdfs.appendContent('文件路径','追加的内容')
res = hdfs.appendContent('/test/put1','add some content')
if res == 0:
return 0
else:
return -1
import hdfs
def main():
#把容器本地的一个 /home/envuser/etl/put3 文件放到HDFS上 /user/db_hongtao_hao/test 目录下
#res = hdfs.put('要放到HDFS上目录下的文件路径','本地文件的路径')
hdfs.put('/test','/home/envuser/etl/put3')
if res == 0:
return 0
else:
return -1
import hdfs
def main():
#新建目录
res = hdfs.mkdir('/test')
if res == 0:
return 0
else:
return -1
Hive¶
使用 Python 访问 Hive 的方法如下:
import hive
def main():
#执行execute语句,返回0代表成功
#execute语句建议用来执行insert,create,load,drop等语句
rs = hive.execute('''load data inpath '/testfile/emp.txt' into table emp''')
if res == 0:
return 0
else:
return -1
import hive
def main():
#执行executeUpdate语句,返回执行受到影响的行数
#建议用来执行insert,update,delete语句
#返回值小于0为执行失败,大于等于0都代表执行成功
#返回值代表执行受到影响的行数,但是要根据实际SQL语句判断,部分HQL语句并不会返回执行受到影响的行数
rs = hive.executeUpdate('''insert into emp values('a','b')''')
if res >= 0:
return 0
else:
return -1
import hive
def main():
#执行executeQuery语句,返回ResultSet
#建议用来执行查询语句
rs = hive.executeQuery('select count(*) from emp')
while rs.next():
print(rs.getInt(1))
return 0
Kafka¶
使用 Python 访问 Kafka 的方法如下:
from Msg import MsgBuilder,MeasurepointBuilder
import batchTSDBWriter
def main():
#构建测点数据,如果不传入时间戳,我们会以当前时间戳为默认值
mp1 = MeasurepointBuilder.builder().add_measurepoint("MeasurePoint11", 100).add_measurepoint("MeasurePoint12", "aa")
mp2 = MeasurepointBuilder.builder().add_measurepoint("MeasurePoint21", 100).add_measurepoint("MeasurePoint22", "aa").set_timestamp(1542609276270)
#需要依次传入组织ID,模型ID,设备资产ID
#可以通过set_modelIdPath传入路径
#可以通过add_payload上传测点
#set_dq初始化一个新的dq
#add_dq向之前的dq追加
msg = MsgBuilder.builder("1b47ed98d1800000","inverter","zabPDuHq")set_modelIdPath("/").add_payload(mp1).add_payload(mp2)
#str(msg)要发入kafka的message
#第一个参数为Boolean,值为False时,不对measurepoints进行校验
#第二个参数为Boolean,值为False时,不对assetId进行校验
#message字节数不可大于 3000 byte
res = batchTSDBWriter.send_data(str(msg),False,False)
if res == 0:
return 0
else:
return -1
S3¶
使用 Python 访问 S3 的方法如下:
import s3Wrapper
def main():
s3_session = s3Wrapper.conn(当前OU下可用S3数据源的ID)
#可以利用 s3_session 做一系列操作
for bucket in s3_session.buckets.all():
print('bucket name:%s'%bucket.name)
return 0
Blob¶
使用 Python 访问 Blob 的方法如下:
import blobWrapper
def main():
container_client = blobWrapper.conn(当前OU下可用Blob数据源的ID)
blob_list = container_client.list_blobs('commonfs')
#可以利用 blob_list 做一系列操作
for blob in blob_list:
print("\t" + blob.name)
return 0