Python访问各类数据源方法


本文描述了如何使用 Python 访问 HDFS、Hive、Kafka、S3、Blob等数据源中的数据。

成功判断

必须定义一个 main 函数,并且包含返回值:

def main():
    print(1)
    return 0


任务成功判断示例,返回 0:

def main():
    print(‘The task runs successfully’)
    return 0


任务失败包括如下情况:

  • 返回值不是 0

  • 程序抛出异常导致程序无法正常运行

例如:

def main():
    print(‘The task fails’)
    return 'fail'

def main():
    print(‘The task fails’)
    return -1

def main():
    print(‘The task fails’)
    raise Exception(‘raise exeception, let task fail’)

HDFS

使用 Python 访问 HDFS 的方法如下:

import hdfs
def main():
    #列出当前 OU 的 HDFS 的根目录下的文件
    res = hdfs.listDir('/')
    if res == 0:
        return 0
    else:
        return -1

import hdfs
def main():
    #将本地文件上传到 HDFS 上
    #res = hdfs.appendFile('要放到HDFS上目录下的文件路径','容器本地文件的路径')
    #如果 /test/put1 已经存在,那么会把 xia_20200522.txt 文件的内容追加到 /test/put1 这个文件中
    res = hdfs.appendFile('/test/put1','/home/envuser/etl/xia_20200522.txt')
    if res == 0:
        return 0
    else:
        return -1

import hdfs
def main():
    #对HDFS文本里追加内容
    #如果 '/test/put1' 不存在,会自动创建
    #追加的内容默认不换行,换行追加需要在文件头加上换行符
    #res = hdfs.appendContent('文件路径','追加的内容')
    res = hdfs.appendContent('/test/put1','add some content')
    if res == 0:
        return 0
    else:
        return -1

import hdfs
def main():
    #把容器本地的一个 /home/envuser/etl/put3 文件放到HDFS上 /user/db_hongtao_hao/test 目录下
    #res = hdfs.put('要放到HDFS上目录下的文件路径','本地文件的路径')
    hdfs.put('/test','/home/envuser/etl/put3')
    if res == 0:
        return 0
    else:
        return -1

import hdfs
def main():
    #新建目录
    res = hdfs.mkdir('/test')
    if res == 0:
        return 0
    else:
        return -1

Hive

使用 Python 访问 Hive 的方法如下:

import hive
def main():
    #执行execute语句,返回0代表成功
    #execute语句建议用来执行insert,create,load,drop等语句
    rs = hive.execute('''load data inpath '/testfile/emp.txt' into table emp''')
    if res == 0:
        return 0
    else:
        return -1

import hive
def main():
    #执行executeUpdate语句,返回执行受到影响的行数
    #建议用来执行insert,update,delete语句
    #返回值小于0为执行失败,大于等于0都代表执行成功
    #返回值代表执行受到影响的行数,但是要根据实际SQL语句判断,部分HQL语句并不会返回执行受到影响的行数
    rs = hive.executeUpdate('''insert into emp values('a','b')''')
    if res >= 0:
        return 0
    else:
        return -1

import hive
def main():
    #执行executeQuery语句,返回ResultSet
    #建议用来执行查询语句
    rs = hive.executeQuery('select count(*) from emp')
    while rs.next():
        print(rs.getInt(1))
    return 0

Kafka

使用 Python 访问 Kafka 的方法如下:

from Msg import MsgBuilder,MeasurepointBuilder

import batchTSDBWriter

def main():
    #构建测点数据,如果不传入时间戳,我们会以当前时间戳为默认值

    mp1 = MeasurepointBuilder.builder().add_measurepoint("MeasurePoint11", 100).add_measurepoint("MeasurePoint12", "aa")
    mp2 = MeasurepointBuilder.builder().add_measurepoint("MeasurePoint21", 100).add_measurepoint("MeasurePoint22", "aa").set_timestamp(1542609276270)

    #需要依次传入组织ID,模型ID,设备资产ID
    #可以通过set_modelIdPath传入路径
    #可以通过add_payload上传测点
    #set_dq初始化一个新的dq
    #add_dq向之前的dq追加
    msg = MsgBuilder.builder("1b47ed98d1800000","inverter","zabPDuHq")set_modelIdPath("/").add_payload(mp1).add_payload(mp2)

    #str(msg)要发入kafka的message
    #第一个参数为Boolean,值为False时,不对measurepoints进行校验
    #第二个参数为Boolean,值为False时,不对assetId进行校验
    #message字节数不可大于 3000 byte
    res = batchTSDBWriter.send_data(str(msg),False,False)

    if res == 0:
        return 0
    else:
        return -1

S3

使用 Python 访问 S3 的方法如下:

import s3Wrapper

def main():
    s3_session = s3Wrapper.conn(当前OU下可用S3数据源的ID)
    #可以利用 s3_session 做一系列操作
    for bucket in s3_session.buckets.all():
        print('bucket name:%s'%bucket.name)
    return 0

Blob

使用 Python 访问 Blob 的方法如下:

import blobWrapper

def main():
    container_client = blobWrapper.conn(当前OU下可用Blob数据源的ID)
    blob_list = container_client.list_blobs('commonfs')
    #可以利用 blob_list 做一系列操作
    for blob in blob_list:
        print("\t" + blob.name)
    return 0