Python内置函数map、reduce、filter在文本处理中的应用

文件是由很多行组成的,这些行组成一个列表,python提供了处理列表很有用的三个函数:map、reduce、filter。因此在文本处理中,可以使用这三个函数达到代码的更加精简清晰。

这里的map、reduce是python的内置函数,跟hadoop的map、reduce函数没有关系,不过使用的目的有点类似,map函数做预处理、reduce函数一般做聚合。

map、reduce、filter在文本处理中的使用

下面是一个文本文件的内容,第1列是ID,第4列是权重,我们的目标是获取所有ID是奇数的行,将这些行的权重翻倍,最后返回权重值的总和。

ID权重
1name1value111
2name2value212
3name3value313
4name4value414
5name5value515
6name6value616
7name7value717
8name8value818
9name9value919
10name10value1020

使用filter、map、reduce函数的代码如下;

#coding=utf8

'''
Created on 2013-12-15

@author: www.crazyant.net
'''
import pprint

def read_file(file_path):
    '''
            读取文件的每一行,按\t分割后返回字段列表;
    '''
    with open(file_path,"r") as fp:
        for line in fp:
            fields=line[:-1].split("\t")
            yield fields
    fp.close()

def is_even_lines(fields):
    '''
            判断该行是否第一列的数字为偶数;
    '''
    return int(fields[0])%2==0

def double_weights(fields):
    '''
            将每一行的权重这一字段的值翻倍
    '''
    fields[-1]=int(fields[-1])*2
    return fields

def sum_weights(sum_value, fields):
    '''
            累加数字x到数字sum_value上面;
            返回新的sum_value值;
    '''
    sum_value+=int(fields[-1])
    return sum_value

if __name__=="__main__":
    #读取文件中的所有行
    file_lines=[x for x in read_file("test_data")]
    print '文件中原始的行:'
    pprint.pprint(file_lines)
    
    print '----'
    
    #过滤掉ID为偶数的行
    odd_lines=filter(is_even_lines,file_lines)
    print '过滤掉ID为偶数的行:'
    pprint.pprint(odd_lines)
    
    print '----'
    
    #将每行的权重值翻倍
    double_weights_lines=map(double_weights,odd_lines)
    print '将每行的权重值翻倍:'
    pprint.pprint(double_weights_lines)
    
    print '----'
    
    #计算所有的权重值的和
    #由于传给sum函数的每个元素都是一个列表,所以需要先提供累加的初始值,这里指定为0
    sum_val=reduce(sum_weights, double_weights_lines, 0)
    print '计算每行权重值的综合:'
    print sum_val

运行结果:

文件中原始的行:
[['1', 'name1', 'value1', '11'],
 ['2', 'name2', 'value2', '12'],
 ['3', 'name3', 'value3', '13'],
 ['4', 'name4', 'value4', '14'],
 ['5', 'name5', 'value5', '15'],
 ['6', 'name6', 'value6', '16'],
 ['7', 'name7', 'value7', '17'],
 ['8', 'name8', 'value8', '18'],
 ['9', 'name9', 'value9', '19'],
 ['10', 'name10', 'value10', '20']]
----
过滤掉ID为偶数的行:
[['2', 'name2', 'value2', '12'],
 ['4', 'name4', 'value4', '14'],
 ['6', 'name6', 'value6', '16'],
 ['8', 'name8', 'value8', '18'],
 ['10', 'name10', 'value10', '20']]
----
将每行的权重值翻倍:
[['2', 'name2', 'value2', 24],
 ['4', 'name4', 'value4', 28],
 ['6', 'name6', 'value6', 32],
 ['8', 'name8', 'value8', 36],
 ['10', 'name10', 'value10', 40]]
----
计算每行权重值的综合:
160

 

map、reduce、filter函数的特点

  • filter函数:以列表为参数,返回满足条件的元素组成的列表;类似于SQL中的where a=1
  • map函数:以列表为参数,对每个元素做处理,返回这些处理后元素组成的列表;类似于sql中的select a*2
  • reduce函数:以列表为参数,对列表进行累计、汇总、平均等聚合函数;类似于sql中的select sum(a),average(b)

这些函数官方的解释

map(function, iterable, …)

Apply function to every item of iterable and return a list of the results. If additional iterable arguments are passed, function must take that many arguments and is applied to the items from all iterables in parallel. If one iterable is shorter than another it is assumed to be extended with None items. If function is None, the identity function is assumed; if there are multiple arguments, map() returns a list consisting of tuples containing the corresponding items from all iterables (a kind of transpose operation). The iterable arguments may be a sequence or any iterable object; the result is always a list.

reduce(function, iterable[, initializer])

Apply function of two arguments cumulatively to the items of iterable, from left to right, so as to reduce the iterable to a single value. For example, reduce(lambda x, y: x+y, [1, 2, 3, 4, 5]) calculates ((((1+2)+3)+4)+5). The left argument, x, is the accumulated value and the right argument, y, is the update value from the iterable. If the optional initializer is present, it is placed before the items of the iterable in the calculation, and serves as a default when the iterable is empty. If initializer is not given and iterable contains only one item, the first item is returned. Roughly equivalent to:

def reduce(function, iterable, initializer=None):
it = iter(iterable)
if initializer is None:
try:
initializer = next(it)
except StopIteration:
raise TypeError(‘reduce() of empty sequence with no initial value’)
accum_value = initializer
for x in it:
accum_value = function(accum_value, x)
return accum_value​

filter(function, iterable)

Construct a list from those elements of iterable for which function returns true. iterable may be either a sequence, a container which supports iteration, or an iterator. If iterable is a string or a tuple, the result also has that type; otherwise it is always a list. If function is None, the identity function is assumed, that is, all elements of iterable that are false are removed.

Note that filter(function, iterable) is equivalent to [item for item in iterable if function(item)] if function is not None and [item for item in iterable if item] if function is None.

See itertools.ifilter() and itertools.ifilterfalse() for iterator versions of this function, including a variation that filters for elements where the function returns false.

参考资料:

http://docs.python.org/2/library/functions.html

http://www.oschina.net/code/snippet_111708_16145

转载请注明来源: http://crazyant.net/1390.html

mysql根据A表更新B表的方法

最近遇到一个需求:mysql中A表和B表都有(id, age)字段,现在想读取B表的age字段,将其update到A表对应ID的age字段中去,我直接想到了一种方案:用Python读取B表,获得{id:age}形式的数据,然后根据每个ID和age的值依次update A表。

两个表分别定义和数据如下:

A表定义:

Field Type Comment
id int(11)  
name varchar(20)  
age int(11)  

数据:

1,name1,0
2,name2,0
3,name3,0
4,name4,0
5,name5,0

B表定义

Field Type Comment
id int(11)  
age int(11)  

数据:

1,11
2,21
3,31
4,41
5,51

python代码来实现

# -*- encoding:utf8 -*-
'''
@author: crazyant.net
读取B表的(id, age)数据,然后依次更新A表;
'''
from common.DBUtil import DB

dbUtil = DB('127.0.0.1',3306,'root','','test')

rs = dbUtil.query("SELECT id,age FROM table_b")

for row in rs:
    (idv,age)=row
    print (idv,age)
    update_sql="update table_a set age='%s' where id='%s';"%(age,idv)
    print update_sql
    dbUtil.update(update_sql)

print 'over'

其实一条SQL语句就可以搞定

 

看了看代码,实在是简单,于是网上搜了一下mysql能不能根据一个表更新另一个表,结果发现update本身就支持多个表更新的功能。

UPDATE table_a,table_b SET table_a.age=table_b.age WHERE table_a.id=table_b.id;

用python代码就显得是大炮打蚊子多次一举了。

 

转载请注明来源:链接

Python访问MySQL封装的常用类

python访问mysql比较简单,细节请参考我的另一篇文章:链接

自己平时也就用到两个mysql函数:查询和更新,下面是自己常用的函数的封装,大家拷贝过去直接可以使用。

文件名:DBUtil.py

# -*- encoding:utf8 -*-
'''
@author: crazyant.net
@version: 2013-10-22

封装的mysql常用函数
'''

import MySQLdb

class DB():
    def __init__(self, DB_HOST, DB_PORT, DB_USER, DB_PWD, DB_NAME):
        self.DB_HOST = DB_HOST
        self.DB_PORT = DB_PORT
        self.DB_USER = DB_USER
        self.DB_PWD = DB_PWD
        self.DB_NAME = DB_NAME
        
        self.conn = self.getConnection()

    def getConnection(self):
        return MySQLdb.Connect(
                           host=self.DB_HOST, #设置MYSQL地址
                           port=self.DB_PORT, #设置端口号
                           user=self.DB_USER, #设置用户名
                           passwd=self.DB_PWD, #设置密码
                           db=self.DB_NAME, #数据库名
                           charset='utf8' #设置编码
                           )

    def query(self, sqlString):
        cursor=self.conn.cursor()
        cursor.execute(sqlString)
        returnData=cursor.fetchall()
        cursor.close()
        self.conn.close()
        return returnData
    
    def update(self, sqlString):
        cursor=self.conn.cursor()
        cursor.execute(sqlString)
        self.conn.commit()
        cursor.close()
        self.conn.close()

if __name__=="__main__":
    db=DB('127.0.0.1',3306,'root','','wordpress')
    print db.query("show tables;")

使用方法为文件下面的main函数,使用query执行select语句并获取结果;或者使用update进行insert、delete等操作。

 

 

python执行shell的两种方法

有两种方法可以在Python中执行SHELL程序,方法一是使用Python的commands包,方法二则是使用subprocess包,这两个包均是Python现有的内置模块。

使用python内置commands模块执行shell

commands对Python的os.popen()进行了封装,使用SHELL命令字符串作为其参数,返回命令的结果数据以及命令执行的状态;

该命令目前已经废弃,被subprocess所替代;

# coding=utf-8
'''
Created on 2013年11月22日

@author: crazyant.net
'''
import commands
import pprint

def cmd_exe(cmd_String):
    print "will exe cmd,cmd:"+cmd_String
    return commands.getstatusoutput(cmd_String)

if __name__=="__main__":
    pprint.pprint(cmd_exe("ls -la"))

使用python最新的subprocess模块执行shell

Python目前已经废弃了os.system,os.spawn*,os.popen*,popen2.*,commands.*来执行其他语言的命令,subprocesss是被推荐的方法

subprocess允许你能创建很多子进程,创建的时候能指定子进程和子进程的输入、输出、错误输出管道,执行后能获取输出结果和执行状态。

# coding=utf-8
'''
Created on 2013年11月22日

@author: crazyant.net
'''
import shlex
import datetime
import subprocess
import time

def execute_command(cmdstring, cwd=None, timeout=None, shell=False):
    """执行一个SHELL命令
            封装了subprocess的Popen方法, 支持超时判断,支持读取stdout和stderr
           参数:
        cwd: 运行命令时更改路径,如果被设定,子进程会直接先更改当前路径到cwd
        timeout: 超时时间,秒,支持小数,精度0.1秒
        shell: 是否通过shell运行
    Returns: return_code
    Raises:  Exception: 执行超时
    """
    if shell:
        cmdstring_list = cmdstring
    else:
        cmdstring_list = shlex.split(cmdstring)
    if timeout:
        end_time = datetime.datetime.now() + datetime.timedelta(seconds=timeout)
    
    #没有指定标准输出和错误输出的管道,因此会打印到屏幕上;
    sub = subprocess.Popen(cmdstring_list, cwd=cwd, stdin=subprocess.PIPE,shell=shell,bufsize=4096)
    
    #subprocess.poll()方法:检查子进程是否结束了,如果结束了,设定并返回码,放在subprocess.returncode变量中 
    while sub.poll() is None:
        time.sleep(0.1)
        if timeout:
            if end_time <= datetime.datetime.now():
                raise Exception("Timeout:%s"%cmdstring)
            
    return str(sub.returncode)

if __name__=="__main__":
    print execute_command("ls")

也可以在Popen中指定stdin和stdout为一个变量,这样就能直接接收该输出变量值。

总结

在python中执行SHELL有时候也是很必须的,比如使用Python的线程机制启动不同的shell进程,目前subprocess是Python官方推荐的方法,其支持的功能也是最多的,推荐大家使用。

转载请注明来源:http://crazyant.net/1319.html

Python封装的常用日期函数

处理日志数据时,经常要对日期进行进行计算,比如日期加上天数、日期相差天数、日期对应的周等计算,本文收集了几个常用的python日期功能函数,一直更新中。

直接贴代码(文件名DateUtil.py),函数功能可以直接查看注释:

# -*- encoding:utf8 -*-
'''
@author: crazyant
@version: 2013-10-12
'''
import datetime, time

#定义的日期的格式,可以自己改一下,比如改成"$Y年$m月$d日"
format_date = "%Y-%m-%d"
format_datetime = "%Y-%m-%d %H:%M:%S"

def getCurrentDate():
    '''
            获取当前日期:2013-09-10这样的日期字符串
    '''
    return time.strftime(format_date, time.localtime(time.time()))

def getCurrentDateTime():
    '''
            获取当前时间:2013-09-10 11:22:11这样的时间年月日时分秒字符串
    '''
    return time.strftime(format_datetime, time.localtime(time.time()))

def getCurrentHour():
    '''
            获取当前时间的小时数,比如如果当前是下午16时,则返回16
    '''
    currentDateTime=getCurrentDateTime()
    return currentDateTime[-8:-6] 

def getDateElements(sdate):
    '''
            输入日期字符串,返回一个结构体组,包含了日期各个分量
            输入:2013-09-10或者2013-09-10 22:11:22
            返回:time.struct_time(tm_year=2013, tm_mon=4, tm_mday=1, tm_hour=21, tm_min=22, tm_sec=33, tm_wday=0, tm_yday=91, tm_isdst=-1)
    '''
    dformat = ""
    if judgeDateFormat(sdate) == 0:
        return None
    elif judgeDateFormat(sdate) == 1:
        dformat = format_date
    elif judgeDateFormat(sdate) == 2:
        dformat = format_datetime
    sdate = time.strptime(sdate, dformat)
    return sdate

def getDateToNumber(date1):
    '''
            将日期字符串中的减号冒号去掉: 
            输入:2013-04-05,返回20130405
            输入:2013-04-05 22:11:23,返回20130405221123
    '''
    return date1.replace("-","").replace(":","").replace("","")

def judgeDateFormat(datestr):
    '''
            判断日期的格式,如果是"%Y-%m-%d"格式则返回1,如果是"%Y-%m-%d %H:%M:%S"则返回2,否则返回0
            参数 datestr:日期字符串
    '''
    try:
        datetime.datetime.strptime(datestr, format_date)
        return 1
    except:
        pass

    try:
        datetime.datetime.strptime(datestr, format_datetime)
        return 2
    except:
        pass

    return 0

def minusTwoDate(date1, date2):
    '''
            将两个日期相减,获取相减后的datetime.timedelta对象
            对结果可以直接访问其属性days、seconds、microseconds
    '''
    if judgeDateFormat(date1) == 0 or judgeDateFormat(date2) == 0:
        return None
    d1Elements = getDateElements(date1)
    d2Elements = getDateElements(date2)
    if not d1Elements or not d2Elements:
        return None
    d1 = datetime.datetime(d1Elements.tm_year, d1Elements.tm_mon, d1Elements.tm_mday, d1Elements.tm_hour, d1Elements.tm_min, d1Elements.tm_sec)
    d2 = datetime.datetime(d2Elements.tm_year, d2Elements.tm_mon, d2Elements.tm_mday, d2Elements.tm_hour, d2Elements.tm_min, d2Elements.tm_sec)
    return d1 - d2

def dateAddInDays(date1, addcount):
    '''
            日期加上或者减去一个数字,返回一个新的日期
            参数date1:要计算的日期
            参数addcount:要增加或者减去的数字,可以为1、2、3、-1、-2、-3,负数表示相减
    '''
    try:
        addtime=datetime.timedelta(days=int(addcount))
        d1Elements=getDateElements(date1)
        d1 = datetime.datetime(d1Elements.tm_year, d1Elements.tm_mon, d1Elements.tm_mday)
        datenew=d1+addtime
        return datenew.strftime(format_date)
    except Exception as e:
        print e
        return None

def is_leap_year(pyear):
    '''
            判断输入的年份是否是闰年 
    '''   
    try:                     
        datetime.datetime(pyear, 2, 29)
        return True          
    except ValueError:       
        return False         

def dateDiffInDays(date1, date2):
    '''
            获取两个日期相差的天数,如果date1大于date2,返回正数,否则返回负数
    '''
    minusObj = minusTwoDate(date1, date2)
    try:
        return minusObj.days
    except:
        return None

def dateDiffInSeconds(date1, date2):
    '''
            获取两个日期相差的秒数
    '''
    minusObj = minusTwoDate(date1, date2)
    try:
        return minusObj.days * 24 * 3600 + minusObj.seconds
    except:
        return None

def getWeekOfDate(pdate):
    '''
            获取日期对应的周,输入一个日期,返回一个周数字,范围是0~6、其中0代表周日
    '''
    pdateElements=getDateElements(pdate)

    weekday=int(pdateElements.tm_wday)+1
    if weekday==7:
        weekday=0
    return weekday

if __name__=="__main__":
    '''
            一些测试代码
    '''
    print judgeDateFormat("2013-04-01")
    print judgeDateFormat("2013-04-01 21:22:33")
    print judgeDateFormat("2013-04-31 21:22:33")
    print judgeDateFormat("2013-xx")
    print "--"
    print datetime.datetime.strptime("2013-04-01", "%Y-%m-%d")
    print 'elements'
    print getDateElements("2013-04-01 21:22:33")
    print 'minus'
    print minusTwoDate("2013-03-05", "2012-03-07").days
    print dateDiffInSeconds("2013-03-07 12:22:00", "2013-03-07 10:22:00")
    print type(getCurrentDate())
    print getCurrentDateTime()
    print dateDiffInSeconds(getCurrentDateTime(), "2013-06-17 14:00:00")
    print getCurrentHour()
    print dateAddInDays("2013-04-05",-5)
    print getDateToNumber("2013-04-05")
    print getDateToNumber("2013-04-05 22:11:33")

    print getWeekOfDate("2013-10-01")

转载请注明来源:http://crazyant.net/1309.html

python子类调用父类的方法

python和其他面向对象语言类似,每个类可以拥有一个或者多个父类,它们从父类那里继承了属性和方法。如果一个方法在子类的实例中被调用,或者一个属性在子类的实例中被访问,但是该方法或属性在子类中并不存在,那么就会自动的去其父类中进行查找。

继承父类后,就能调用父类方法和访问父类属性,而要完成整个集成过程,子类是需要调用的构造函数的。

子类不显式调用父类的构造方法,而父类构造函数初始化了一些属性,就会出现问题

如果子类和父类都有构造函数,子类其实是重写了父类的构造函数,如果不显式调用父类构造函数,父类的构造函数就不会被执行,导致子类实例访问父类初始化方法中初始的变量就会出现问题。

class A:
    def __init__(self):
        self.namea="aaa"

    def funca(self):
        print "function a : %s"%self.namea

class B(A):
    def __init__(self):
        self.nameb="bbb"

    def funcb(self):
        print "function b : %s"%self.nameb

b=B()
print b.nameb
b.funcb()

b.funca()

结果:

bbb
function b : bbb
Traceback (most recent call last):
  File "D:\workbench\python\MyPythonProject\test\study\overwrite_method.py", line 19, in <module>
    print b.funca()
  File "D:\workbench\python\MyPythonProject\test\study\overwrite_method.py", line 6, in funca
    print "function a : %s"%self.namea
AttributeError: B instance has no attribute 'namea'

在子类中,构造函数被重写,但新的构造方法没有任何关于初始化父类的namea属性的代码,为了达到预期的效果,子类的构造方法必须调用其父类的构造方法来进行基本的初始化。有两种方法能达到这个目的:调用超类构造方法的未绑定版本,或者使用super函数。

方法一:调用未绑定的超类构造方法

修改代码,多增一行:

class A:
    def __init__(self):
        self.namea="aaa"

    def funca(self):
        print "function a : %s"%self.namea

class B(A):
    def __init__(self):
        #这一行解决了问题
        A.__init__(self)
        self.nameb="bbb"

    def funcb(self):
        print "function b : %s"%self.nameb

b=B()
print b.nameb
b.funcb()

b.funca()

如上有注释的一行解决了该问题,直接使用父类名称调用其构造函数即可。

这种方法叫做调用父类的未绑定的构造方法。在调用一个实例的方法时,该方法的self参数会被自动绑定到实例上(称为绑定方法)。但如果直接调用类的方法(比如A.__init),那么就没有实例会被绑定。这样就可以自由的提供需要的self参数,这种方法称为未绑定unbound方法。

通过将当前的实例作为self参数提供给未绑定方法,B类就能使用其父类构造方法的所有实现,从而namea变量被设置。

方法二:使用super函数

修改代码,这次需要增加在原来代码上增加2行:

#父类需要继承object对象
class A(object):
    def __init__(self):
        self.namea="aaa"

    def funca(self):
        print "function a : %s"%self.namea

class B(A):
    def __init__(self):
        #这一行解决问题
        super(B,self).__init__()
        self.nameb="bbb"

    def funcb(self):
        print "function b : %s"%self.nameb

b=B()
print b.nameb
b.funcb()

b.funca()

如上有注释的为新增的代码,其中第一句让类A继承自object类,这样才能使用super函数,因为这是python的“新式类”支持的特性。当前的雷和对象可以作为super函数的参数使用,调用函数返回的对象的任何方法都是调用超类的方法,而不是当前类的方法。

super函数会返回一个super对象,这个对象负责进行方法解析,解析过程其会自动查找所有的父类以及父类的父类。

 方法一更直观,方法二可以一次初始化所有超类

super函数比在超累中直接调用未绑定方法更直观,但是其最大的有点是如果子类继承了多个父类,它只需要使用一次super函数就可以。然而如果没有这个需求,直接使用A.__init__(self)更直观一些。

Python生成文件md5校验值函数

linux有个命令叫做md5sum,能生成文件的md5值,一般情况下都会将结果记录到一个文件中用于校验使用,比如会这样使用:

[crazyant@localhost PythonMd5]$ more sample_file 
www.crazyant.net
www.51projob.com
[crazyant@localhost PythonMd5]$ md5sum sample_file > sample_file.md5file
[crazyant@localhost PythonMd5]$ more sample_file.md5file 
311d384505e3622ccf85d88930e2b0a0  sample_file
[crazyant@localhost PythonMd5]$ md5sum -c sample_file.md5file 
sample_file: OK

其中md5sum -c用于检测生成的md5值是否正确。

使用python生成文件md5值以及生成和md5sum结果一样的结果文件

python可以使用hashlib的md5模块对文件内容进行md5校验码生成,如果要生成和md5sum一样的结果文件,只需要将MD5结果值和文件名输出一行,中间有两个空格输出即可。

测试代码:

# -*- encoding:utf-8 -*-
from hashlib import md5
import os

def generate_file_md5value(fpath):
    '''以文件路径作为参数,返回对文件md5后的值
    '''
    m = md5()
    # 需要使用二进制格式读取文件内容
    a_file = open(fpath, 'rb')    
    m.update(a_file.read())
    a_file.close()
    return m.hexdigest()

def generate_file_md5sumFile(fpath):
    fname = os.path.basename(fpath)
    fpath_md5 = "%s.md5" % fpath
    fout = open(fpath_md5, "w")
    fout.write("%s  %s\n" % (generate_file_md5value(fpath), fname.strip()))
    print "generate success, fpath:%s" % fpath_md5
    fout.flush()
    fout.close()

if __name__ == "__main__":
    fpath = "/home/users/workbench/PythonMd5/sample_file"
    # 测试一:以文件路径作为参数,获得md5后的字符串
    print generate_file_md5value(fpath)
    
    # 测试二:生成和linux命令:md5sum同样结果的.md5文件
    generate_file_md5sumFile(fpath)

 

运行结果:

[crazyant@localhost PythonMd5]$ python generateMd5file.py
311d384505e3622ccf85d88930e2b0a0
generate success, fpath:/home/crazyant/workbench/PythonMd5/sample_file.md5
[crazyant@localhost PythonMd5]$ md5sum -c sample_file.md5
sample_file: OK

 

注意点

在windows下开发的代码,如果直接提交到linux运行,经常因为windows下的换行符是\r\n而linux是\n的缘故导致代码执行失败,一般情况下都要进行一下转换。

为eclipse安装python、shell开发环境和SVN插件

eclipse是一个非常好用的IDE,通常来说我们都用eclipse来开发JAVA程序,为了让开发python、shell等脚本也能在eclipse上运行,出现了很多相关的插件:

  • python:pydev
  • shell:shelled
  • svn:Subclipse
  • PHP:PDT

使用eclipse过程中可以多搜一搜网上的插件,有些能大幅提高开发效率,本文以开发linux环境下的python、shell脚本为背景,安装了eclipse并在其上装上了开发python、shell、svn插件,其中shell插件可以指定shell解释器,这样就能在windows下的eclispe中直接运行shell程序。
继续阅读为eclipse安装python、shell开发环境和SVN插件

Hadoop之使用python实现数据集合间join操作

hadoop之steaming介绍

hadoop有个工具叫做steaming,能够支持python、shell、C++、PHP等其他任何支持标准输入stdin及标准输出stdout的语言,其运行原理可以通过和标准java的map-reduce程序对比来说明:

使用原生java语言实现Map-reduce程序
  1. hadoop准备好数据后,将数据传送给java的map程序
  2. java的map程序将数据处理后,输出O1
  3. hadoop将O1打散、排序,然后传给不同的reduce机器
  4. 每个reduce机器将传来的数据传给reduce程序
  5. reduce程序将数据处理,输出最终数据O2
借助hadoop streaming使用python语言实现Map-reduce程序
  1. hadoop准备好数据后,将数据传送给java的map程序
  2. java的map程序将数据处理成“键/值”对,并传送给python的map程序
  3. python的map程序将数据处理后,将结果传回给java的map程序
  4. java的map程序将数据输出为O1
  5. hadoop将O1打散、排序,然后传给不同的reduce机器
  6. 每个reduce机器将传来的数据处理成“键/值”对,并传送给python的reduce程序
  7. python的reduce程序将数据处理后,将结果返回给java的reduce程序
  8. java的reduce程序将数据处理,输出最终数据O2

上面红色表示map的对比,蓝色表示reduce的对比,可以看出streaming程序多了一步中间处理,这样说来steaming程序的效率和性能应该低于java版的程序,然而python的开发效率、运行性能有时候会大于java,这就是streaming的优势所在。

hadoop之实现集合join的需求

hadoop是用来做数据分析的,大都是对集合进行操作,因此该过程中将集合join起来使得一个集合能得到另一个集合对应的信息的需求非常常见。

比如以下这个需求,有两份数据:学生信息(学号,姓名)和学生成绩(学号、课程、成绩),特点是有个共同的主键“学号”,现在需要将两者结合起来得到数据(学号,姓名,课程,成绩),计算公式:

学号,姓名) join (学号,课程,成绩)= (学号,姓名,课程,成绩)

数据事例1-学生信息:

学号sno姓名name
01name1
02name2
03name3
04name4

数据事例2:-学生成绩:

学号sno课程号courseno成绩grade
010180
010290
020182
020295

期待的最终输出:

学号sno姓名name课程courseno成绩grade
01name10180
01name10290
02name20182
02name20295

实现join的注意点和易踩坑总结

如果你想写一个完善健壮的map reduce程序,我建议你首先弄清楚输入数据的格式、输出数据的格式,然后自己手动构建输入数据并手动计算出输出数据,这个过程中你会发现一些写程序中需要特别处理的地方:

  1. 实现join的key是哪个,是1个字段还是2个字段,本例中key是sno,1个字段
  2. 每个集合中key是否可以重复,本例中数据1不可重复,数据2的key可以重复
  3. 每个集合中key的对应值是否可以不存在,本例中有学生会没成绩,所以数据2的key可以为空

第1条会影响到hadoop启动脚本中key.fields和partition的配置,第2条会影响到map-reduce程序中具体的代码实现方式,第3条同样影响代码编写方式。

hadoop实现join操作的思路

具体思路是给每个数据源加上一个数字标记label,这样hadoop对其排序后同一个字段的数据排在一起并且按照label排好序了,于是直接将相邻相同key的数据合并在一起输出就得到了结果。

1、 map阶段:给表1和表2加标记,其实就是多输出一个字段,比如表一加标记为0,表2加标记为2;

2、 partion阶段:根据学号key为第一主键,标记label为第二主键进行排序和分区

3、 reduce阶段:由于已经按照第一主键、第二主键排好了序,将相邻相同key数据合并输出

hadoop使用python实现join的map和reduce代码

mapper.py的代码:

# -*- coding: utf-8 -*-
#Mapper.py
#来自疯狂的蚂蚁www.crazyant.net
import os
import sys

#mapper脚本
def mapper():
	#获取当前正在处理的文件的名字,这里我们有两个输入文件
	#所以要加以区分
	filepath = os.environ["map_input_file"]
	filename = os.path.split(filepath)[-1]
	for line in sys.stdin:
		if line.strip()=="":
			continue
		fields = line[:-1].split("\t")
		sno = fields[0]
		#以下判断filename的目的是不同的文件有不同的字段,并且需加上不同的标记
		if filename == 'data_info':
			name = fields[1]
			#下面的数字'0'就是为数据源1加上的统一标记
			print '\t'.join((sno,'0',name))
		elif filename == 'data_grade':
			courseno = fields[1]
			grade = fields[2]
			#下面的数字'1'就是为数据源1加上的统一标记
			print '\t'.join((sno,'1',courseno,grade))

if __name__=='__main__':
	mapper()

reducer的代码:

# -*- coding: utf-8 -*-
#reducer.py
#来自疯狂的蚂蚁www.crazyant.net
import sys

def reducer():
	#为了记录和上一个记录的区别,用lastsno记录上个sno
	lastsno = ""

	for line in sys.stdin:
		if line.strip()=="":
			continue
		fields = line[:-1].split("\t")
		sno = fields[0]
		'''
		处理思路:
		遇见当前key与上一条key不同并且label=0,就记录下来name值,
		当前key与上一条key相同并且label==1,则将本条数据的courseno、
		grade联通上一条记录的name一起输出成最终结果
		'''
		if sno != lastsno:
			name=""
			#这里没有判断label==1的情况,
			#因为sno!=lastno,并且label=1表示该条key没有数据源1的数据
			if fields[1]=="0":
				name=fields[2]
		elif sno==lastno:
			#这里没有判断label==0的情况,
			#因为sno==lastno并且label==0表示该条key没有数据源2的数据
			if fields[2]=="1":
				courseno=fields[2]
				grade=fields[3]
				if name:
					print '\t'.join((lastsno,name,courseno,grade))
		lastsno = sno

if __name__=='__main__':
	reducer()

使用shell脚本启动hadoop程序的方法:

#先删除输出目录
~/hadoop-client/hadoop/bin/hadoop fs -rmr /hdfs/jointest/output
#来自疯狂的蚂蚁www.crazyant.net
#注意,下面配置中的环境值每个人机器不一样
~/hadoop-client/hadoop/bin/hadoop streaming \
	-D mapred.map.tasks=10 \
	-D mapred.reduce.tasks=5 \
	-D mapred.job.map.capacity=10 \
	-D mapred.job.reduce.capacity=5 \
	-D mapred.job.name="join--sno_name-sno_courseno_grade" \
	-D num.key.fields.for.partition=1 \
	-D stream.num.map.output.key.fields=2 \
	-partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner \
	-input "/hdfs/jointest/input/*" \
	-output "/hdfs/jointest/output" \
	-mapper "python26/bin/python26.sh mapper.py" \
	-reducer "python26/bin/python26.sh reducer.py" \
	-file "mapper.py" \
	-file "reducer.py" \
	-cacheArchive "/share/python26.tar.gz#python26"

#看看运行成功没,若输出0则表示成功了
echo $?

可以自己手工构造输入输出数据进行测试,本程序是验证过的。

更多需要注意的地方

hadoop的join操作可以分为很多类型,各种类型脚本的编写有所不同,其分类是按照key字段数目、value字段数目、key是否可重复来划分的,以下是一个个人总结的对照表,表示会影响的地方:

影响类型影响的范围
key字段数目1、启动脚本中num.key.fields.for.partition的配置2、启动脚本中stream.num.map.output.key.fields的配置

3、map和reduce脚本中key的获取

4、map和reduce脚本中每一条数据和上一条数据比较的方法key是否可重复如果数据源1可重复,标记为M;数据源2可重复标记为N,那么join可以分为:1*1、M*1、M*N类型

1*1类型:reduce中先记录第一个value,然后在下一条直接合并输出;

M*1类型:将类型1作为标记小的输出,然后每次遇见label=1就记录value,每遇见一次label=2就输出一次最终结果;

M*N类型:遇见类型1,就用数组记录value值,遇见label=2就将将记录的数组值全部连同该行value输出。value字段数目影响每次label=1时记录的数据个数,需要将value都记录下来

原文链接 转载须注明!

Django基本命令最全收集

Django是一个python用于快速开发web应用的框架,它的很多特性使用极其方便快捷。当创建一个django项目和对项目进行管理的时候,会涉及到很多命令行命令。本文对其进行一些总结,以供方便查询。

django-admin.py startproject mysite

该命令在当前目录创建一个 mysite 目录。

django-admin.py这个文件在C:\Python27\Lib\site-packages\django\bin文件夹里,可以把该目录添加到系统Path里面。

Django内置一个轻量级的Web服务器。

进入 mysite 目录的话,现在进入其中,并运行 python manage.py runserver 命令

启动服务器,用http://127.0.0.1:8000/可以进行浏览了,8000是默认的端口号。

python manage.py runserver 8080

更改服务器端口号

python manage.py shell

启动交互界面

python manage.py startapp books

创建一个app,名为books

python manage.py validate

验证Django数据模型代码是否有错误

python manage.py sqlall books

为模型产生sql代码

python manage.py syncdb

运行sql语句,创建模型相应的Table

python manage.py dbshell

启动数据库的命令行工具

manage.py sqlall books

查看books这个app下所有的表

python manage.py syncdb

同步数据库,生成管理界面使用的额外的数据库表