Spark使用JAVA编写自定义函数修改DataFrame

本文的代码涉及几个知识点,都是比较有用:

1、Spark用JAVA编写代码的方式;

2、Spark读取MySQL数据表,并且使用的是自定义SQL的方式,默认会读取整个表的;

3、Spark使用sql.functions的原有方法,给dataframe新增列、变更列;

4、Spark使用udf的自定义函数,给dataframe新增列、变更列;

/**
 * spark直接读取mysql
 */
private static Dataset<Row> queryMySQLData(SparkSession spark) {
    Properties properties = new Properties();
    properties.put("user", "root");
    properties.put("password", "12345678");
    properties.put("driver", "com.mysql.jdbc.Driver");
    // 可以写SQL语句查询数据结果
    return spark.read().jdbc(
            "jdbc:mysql://127.0.0.1:3306/test"
            , "(select id, name from tb_data) tsub",
            properties);
}

整个函数使用spark.read().jdbc读取mysql数据表,配置了mysql的user、passpord、driver,jdbcurl,以及可以通过sql语句执行数据查询,sql语句这里在spark源文档是table name,如果只设置table name,则会读取整个表,可以使用(select id, name from tb_data) tsub的方式读取SQL结果,注意的是这里必须给SQL语句设定一个标的别名。

以下是几种给dataframe添加新列、修改原有列的方法

方法1:使用functions中的函数,有一些局限性

// 方法1:使用functions中的函数,有一些局限性
inputData.withColumn("name_length_method1", functions.length(inputData.col("name")));

使用的是sql.functions里面的方法,里面支持了大部分的size、length等等方法,不过还是不够灵活,因为不支持就是不支持;

方法2:自定义注册udf,可以用JAVA代码写处理

可以先用spark.udf().register注册方法,然后使用functions.callUDF进行调用,其中自定义方法需要实现UDF1~UDF20的接口,分别代表传入不同的入参列:

// 方法2:自定义注册udf,可以用JAVA代码写处理
spark.udf().register(
        "getLength",
        new UDF1<String, Integer>() {
            @Override
            public Integer call(String s) throws Exception {
                return s.length();
            }
        },
        DataTypes.IntegerType);

inputData = inputData.withColumn(
        "name_length_method2",
        functions.callUDF("getLength",
                inputData.col("name"))
);

// 方法2.1:可以写UDF2~UDF20,就是把输入字段变成多个
spark.udf().register(
        "getLength2",
        new UDF2<Long, String, Long>() {

            @Override
            public Long call(Long aLong, String s) throws Exception {
                return aLong + s.length();
            }
        },
        DataTypes.LongType);

inputData = inputData.withColumn(
        "name_length_method3",
        functions.callUDF(
                "getLength2",
                inputData.col("id"),
                inputData.col("name"))
);

inputData.show(20, false);

代码地址见:github地址

tb_data的mysql表数据读取后的原始dataframe的schema:

root
 |-- id: long (nullable = true)
 |-- name: string (nullable = true)

数据:

+---+-----------+
|id |name       |
+---+-----------+
|1  |name1      |
|2  |name22     |
|3  |name333    |
|4  |name4444   |
|5  |name55555  |
|6  |name666666 |
|7  |name7777777|
+---+-----------+

最终计算之后的数据输出:

+---+-----------+-------------------+-------------------+-------------------+
|id |name       |name_length_method1|name_length_method2|name_length_method3|
+---+-----------+-------------------+-------------------+-------------------+
|1  |name1      |5                  |5                  |6                  |
|2  |name22     |6                  |6                  |8                  |
|3  |name333    |7                  |7                  |10                 |
|4  |name4444   |8                  |8                  |12                 |
|5  |name55555  |9                  |9                  |14                 |
|6  |name666666 |10                 |10                 |16                 |
|7  |name7777777|11                 |11                 |18                 |
+---+-----------+-------------------+-------------------+-------------------+

 

MySQL导入导出数据时遇到Tab符号和换行符号怎么办?

在做ETL(下载、转换、导入)开发的时候,经常会遇到从MySQL中导出数据,经过计算后再导入到MySQL的场景。

那么有一个很难绕过的问题,如果源MySQL的字段中,包含了\t、\n特殊字符,该怎么办?

因为导出文件时,默认是按照\t分割字段、\n分割行,现在字段中出现了\t和\n,这不是乱了吗?

同时导入文件时,load data可以指定fields和lines的分隔符,默认情况都是\t和\n,可是现在字段中有\t和\n,这不乱了吗?

最直观想到的办法,是导出的时候,在select语句中,使用mysql的replace函数,将\t和\n替换成普通字符串,使用程序处理,load到库里之后,在使用replace替换。但是如果数据量很大的话,替换\t和\n很不现实。

其实,这个问题真的是个问题吗?

答案是:根本不用考虑\t和\n

1)MySQL的select导出时,会自动把\t和\n转义成\\t和\\n;

2)在shell、Python的代码中,按行读取,程序遇到\\n,会自动略过的;

3)在shell、Python的代码中,按\t分割,程序遇到\\t,会自动略过的;

4)load data导入数据的时候,遇到了\\t和\\n,也当成普通字符处理;

做一个测试就知道了:

1、准备好数据库、数据表、测试数据

CREATE DATABASE `test` /*!40100 DEFAULT CHARACTER SET utf8 */

use test;
set names utf8;

CREATE TABLE `table_from` (
  `id` INT(11) NOT NULL AUTO_INCREMENT COMMENT 'ID',
  `name` VARCHAR(500) DEFAULT NULL COMMENT '名称',
  `remark` VARCHAR(500) DEFAULT NULL COMMENT '备注',
  KEY `id` (`id`)
) ENGINE=INNODB DEFAULT CHARSET=utf8 COMMENT '导入测试的来源表';

CREATE TABLE `table_to` (
  `id` INT(11) NOT NULL AUTO_INCREMENT COMMENT 'ID',
  `name` VARCHAR(500) DEFAULT NULL COMMENT '名称',
  `remark` VARCHAR(500) DEFAULT NULL COMMENT '备注',
  KEY `id` (`id`)
) ENGINE=INNODB DEFAULT CHARSET=utf8 COMMENT '导入测试的目的表';

truncate table table_from;
truncate table table_to;
insert into table_from values(1,'name1','我们都是屌丝程序员');
insert into table_from values(2,'name2','我们都是屌丝程序员\n');
insert into table_from values(3,'name3','我们都是\t屌丝程序员');
insert into table_from values(4,'name4','我们都是\t屌丝程序员\n');

2、执行导出数据:

mysql -h127.0.0.1 -uroot -p123456 -P3306 -e "
	use test; 
	set names utf8; 
	select * from table_from;
" > /tmp/dump_table_from.txt

查看一下/tmp/dump_table_from.txt

more /tmp/dump_table_from.txt 
id      name    remark
1       name1   我们都是屌丝程序员
2       name2   我们都是屌丝程序员\n
3       name3   我们都是\t屌丝程序员
4       name4   我们都是\t屌丝程序员\n

发现导出的数据中,出现了\t和\n,但是这俩字符是直接显示出来了,而不是变成了真正的TAB和换行。

我们用vim打开:

vim /tmp/dump_table_from.txt

输入\t搜索,发现只匹配到了隐藏的真正的TAB;

输入\\t搜索,才搜到了直接显示出来的\t;

搜索\n和\\n,效果相同;

这就说明,导出的文件中,字段中如果包含了\t和\n,会自动变成\\t和\\n的;

2、使用程序处理

使用Python写一个测试程序,看能不能正常按行读取和按tab分割:

打开Python命令行:

>>> for row in open("/tmp/dump_table_from.txt"):
...     print "#"+row+"#"                       
...     print "#".join(str(row).split("\t"))      
... 
#id     name    remark
#
id#name#remark

#1      name1   我们都是屌丝程序员
#
1#name1#我们都是屌丝程序员

#2      name2   我们都是屌丝程序员\n
#
2#name2#我们都是屌丝程序员\n

#3      name3   我们都是\t屌丝程序员
#
3#name3#我们都是\t屌丝程序员

#4      name4   我们都是\t屌丝程序员\n
#
4#name4#我们都是\t屌丝程序员\n

>>>

可以看到,程序会正常的按\n分割行,按\t分割字段,并没有按照字段内部出现的\t和\n分割,因为他们都是\\t和\\n。

3、直接将文件导入到一个新表

执行导入:

mysql -h127.0.0.1 -uroot -p123456 -P3306 -e "
	use test; 
	set names utf8; 
	load data local 
		infile '/tmp/dump_table_from.txt' 
		into table table_to 
		fields terminated by'\t' 
		lines terminated by'\n' ignore 0 lines (id,name,remark)";

执行命令没有异常,打开目标表,看下结果,发现跟源表一模一样;

最终结论:

1、使用select导出MySQL数据的时候,字段里的\t和\n,会自动的替换成\\t和\\n;

2、在使用shell、Python读取文件的时候,如果遇到了\\n,不会作为行分隔符;使用split(‘\t’)函数分割的时候,如果遇到了\\t,会略过;

3、使用load data向MySQL导入数据的时候,里面的\\t和\\n,不会作为字段分隔符和行分隔符;

 

本文地址:http://crazyant.net/1901.html,转载请注明来源。

有了事务为什么还需要乐观锁和悲观锁

transaction-lock为什么有了事务这东西,还需要乐观锁悲观锁?事务是粗粒度的概念、乐观锁悲观锁可以更细粒度的控制;
比如抢票,假设余票只有1张;隔离级别可以保证事务A和事务B不能读到对方的数据,也不能更新对方正在更新的数据,但是事务A和事务B都认为还有1张余票,于是出票,并更新为0;

事务解决了并发问题,已经不存在并发问题了;

但是事务B读取的是过时数据,依据过时数据做了业务处理;

所以需要乐观锁或者悲观锁,来记录一个信息:当前已经读取的数据,是不是已经过时了!

事务有这么几种实现方式:锁协议、MVCC、时间戳排序协议、有效性检查协议,锁协议是事务的一种实现方式,事务 = 用锁封装的一个函数,可以重用而已,但是这几个事务的函数覆盖面太粗粒度了,所以有时候我们还得借助于锁来进行细粒度控制;
事务不能保证每个操作结果正确,售票时超卖还是会发生。
事务保证整个操作的成一个组,要么全做要么全不做 但是不能保证多个事务同时读取同一个数据
数据对象被加上排它锁时,其他的事务不能对它读取和修改;加了共享锁的数据对象可以被其他事务读取,但不能修改
事务可以用锁实现,可以保证一致性和隔离性,但是锁用来保证并发性;
隔离性和并发性有点类似,但是隔离性只是保证不会出现相互读取中间数据,却无法解决并发的问题

数据库并发控制机制的理解

隔离级别相当于数据库实现的一套现有的机制,我们直接可以复用;但对于特殊需求,我们可以自己使用锁机制来实现,其实我们自己就可以使用锁机制,实现一套隔离级别;

锁机制只是隔离性级别的一种实现;我的感触是,类似封装了函数,数据库隔离性级别,是帮我们实现了几个线程的并发控制方法。但是一些特殊的并发控制,我们可以自己使用锁机制来实现;

QQ-20150527190408

1、如果事务A设置为read uncommitted,那么事务B做了update还未提交,事务A能够读取到事务B更新的数据,事务B如果回滚,事务A则看到事务B回滚后的数据;
2、如果事务A设置为read committed,那么事务B做了update并且没有提交,事务A是读取不到事务B更新的数据的;
3、如果事务A设置为repeatable read,那么事务B做了update并提交,事务A仍然读取不到,事务B即使多次commit,事务A全都读取不到;
4、如果事务A设置为serializable,如果事务B已经开始运行并做了更新,那么事务A的任何操作得一直等待;如果B没做更新,则A还是能读取的;

以上四种级别,事务A如果正在更新一条数据,事务B如果要更新同一条数据,则会等待直到超时,因为事务A更新这条数据时加上了排它锁;

Python操作MySQL视频教程

给大家带来自己制作的Python操作MySQL视频教程。本教程分为三节:Python开发环境搭建以及支持MySQL开发的插件安装、Python访问MySQL数据库的标准API规范接口讲解、Python开发MySQL程序实战编码演示。通过课 程的学习,大家能够基本掌握用Python开发MySQL程序。

视频高清版百度链接: http://pan.baidu.com/s/1DB0qM 密码: ri1n

Python操作MySQL视频教程第一讲 – 开发环境搭建

推荐使用以下的开发环境搭配:

  • Eclipse + JDK7
    • 插件:PyDev 3.8.0
  • python-2.7.8
    • 插件:MySQL-python-1.2.4b4.win32-py2.7
  • MySQL服务器:使用wampserver2.5软件包自带的MySQL软件
    • 需要安装:vcredist_x64
    • Mysql-5.6.17

本视频在优酷的地址:http://v.youku.com/v_show/id_XODE3Nzk4MTEy.html

Python操作MySQL视频教程第二讲 – 标准接口规范

第二讲的视频教程讲解的主要内容是:

  • Python官方针对操作数据库的标准规范
  • Python建立和数据库的connect连接对象
    • connection对象的构造函数,包括主机、端口、用户名、密码、编码等参数
    • connection对象的方法,主要是关闭连接、获取游标、提交事务、回滚事务
  • Python执行SQL语句的cursor对象
    • 普通游标和字典游标的区别,以及字典游标优于普通游标的原因
    • 游标执行SQL语句的方法
    • 游标获取执行SQL语句结果集合的方法
  • Python编写访问数据库程序的框架,主要包括以下步骤:
    1. 导入MySQLdb对象
    2. 获取connection对象
    3. 获取普通游标或者字典游标
    4. 执行SQL语句
    5. 从游标对象中取出数据,对数据做其他处理;
    6. 关闭连接

视频在优酷的地址:http://v.youku.com/v_show/id_XODIxNzQ1MjQ0.html

Python操作MySQL视频教程第三讲 – 实例代码演示

第三讲的视频教程讲解的主要内容是:

  • Python编写MySQL程序的框架
    • 引入模块:import MySQLdb
    • 获取连接:conn = MySQLdb.connect()
    • 获取游标:cursor = conn.cursor()
    • 执行SQL:cursor.execute()
    • 获取数据:curosr.fetchall()
    • 关闭连接:conn.close()
  • MySQL的Innodb和Myisam引擎的区别
    • innodb支持事务,myisam不支持事务
    • 如果访问的是innodb数据库,并执行了insert、delete、update语句,python代码中必须执行conn.commit()才能使得SQL执行生效

视频在优酷的地址:http://v.youku.com/v_show/id_XODI4MjE4Njgw.html

本文的代码和PPT在git上的地址:http://git.oschina.net/peishuaishuai/python-mysql-tutorial

本文的高清视频随后会发布在百度网盘,敬请期待。

本文地址:http://crazyant.net/1664.html ,转载请注明来源。

MySQL执行Select语句将结果导出到文件的方法

如果是对MySQL整个表数据导出,可以参照文章:http://crazyant.net/1355.html

然而也会遇到的场景是,需要执行一个SQL语句,然后将SQL语句的结果输出到文件;

方法一:使用MySQL的select * into outfile ‘/tmp/rs.txt’ from tb_name句型

这是个不可行的方法;

举个例子,执行以下的SQL语句:

mysql -h10.10.10.10 -ucrazyant -p123456 -P3306 -e "use test; select * into outfile '/tmp/rs.txt' from tb_test;"

这个SQL总会报出下面的错误:

ERROR 1045 (28000) at line 1: Access denied for user 'crazyant'@'10.10.10.10' (using password: YES)

原因是这个语句并不是在MySQL客户端,而是在MySQL的服务器上执行的,通常用于服务器管理员在服务器机器上进行数据备份使用,由于MySQL客户端账号并没有访问服务器机器本身的权限,所以这个SQL执行不会成功。

方法2:直接将SQL执行的结果重定向到文件即可

执行下面的命令,能够将SQL语句执行的结果输出到文件:

mysql -h10.10.10.10 -ucrazyant -p123456 -P3306 -Ne "use test; select * from tb_test;" > /tmp/rs.txt

其中-Ne是执行这个SQL语句的选项,-N代表输出SQL语句执行结果中不带第一行的字段名称,-e表示要执行SQL语句;

执行下面的命令,则可以执行SQL文件,并把结果输出到文件:

新建一个文件,名称为runsql.sql,内容为:

use test; select * from db_test;

然后这样执行命令:

mysql -h10.10.10.10 -ucrazyant -p123456 -P3306 -N < runsql.sql > /tmp/rs.txt

其中-N命令仍然表示不输出表头字段说明(第一行),小于号表示输入重定向,runsql.sql的文件内容会被发送给mysql的命令,大于号则表示输出重定向,会将命令执行的结果输出到文件;

总结:

  • select into outfile只能在MySQL服务器上执行,客户端上无法执行;
  • mysql -Ne “sql” > rs.txt可以将SQL语句执行后输出为文件
  • mysql -N < runsql.sql > rs.txt可以执行sql文件中的内容,然后将结果输出到文件;
  • mysql -N的选项,表示输出时不带表头

本文地址:http://crazyant.net/1587.html

 

MySQL 查看数据库中每个表占用的空间大小

转自:http://www.oschina.net/question/12_3673

1、进去指定schema 数据库(存放了其他的数据库的信息) 

mysql> use information_schema;
Database changed

2、查询所有数据的大小

mysql> select concat(round(sum(DATA_LENGTH/1024/1024), 2), 'MB')
    -> as data from TABLES;
+———–+
| data      |
+———–+
| 6674.48MB |
+———–+
1 row in set (16.81 sec)​

3、查看指定数据库实例的大小,比如说数据库 forexpert

mysql> select concat(round(sum(DATA_LENGTH/1024/1024), 2), 'MB')
    -> as data from TABLES where table_schema='forexpert';
+———–+
| data      |
+———–+
| 6542.30MB |
+———–+
1 row in set (7.47 sec)

4、查看指定数据库的表的大小,比如说数据库 forexpert 中的 member 表 

mysql> select concat(round(sum(DATA_LENGTH/1024/1024),2),'MB') as data
    -> from TABLES where table_schema='forexpert'
    -> and table_name='member';
+——–+
| data   |
+——–+
| 2.52MB |
+——–+
1 row in set (1.88 sec)

MySQL数据导入导出实例教程手册

mysqldump是mysql自带的一个数据导入导出工具,其官方注释为:

shell> mysqldump [options] db_name [tbl_name …]
shell> mysqldump [options] –databases db_name …
shell> mysqldump [options] –all-databases

使用mysqldump命令将整个数据库导出

mysqldump -h 127.0.0.1 -P 3306 -u root -p123456 crazyant –skip-lock-tables > crazyant.sql

其中-h后面是主机名,-P后面是端口号,-u后面是用户名,-p后面是密码;crazyant是要导出的数据库名;

由于导出过程中出现了这个错误:

mysqldump: Got error: 1044: Access denied for user 'root'@'127.0.0.1' to database 'crazyant' when using LOCK TABLE

所以在最后面,我加上了–skip-lock-tables这个选项,就没有错误了。

mysqldump命令导出时设定不需要导出的表

有时候一个数据库中有一些表我们不想导出,这时候可以使用mysqldump的–ignore-table命令来设定过滤的表,该参数的用法如下:

–ignore-table=name Do not dump the specified table. To specify more than one
                      table to ignore, use the directive multiple times, once
                      for each table.  Each table must be specified with both
                      database and table names, e.g.,
                      –ignore-table=database.table.

可以看出其用法:

  •  –ignore-table=database.table
  • 每次只能指定一个表,如果要指定多个表,该选项需要写多次

使用实例:

mysqldump -h 127.0.1.1 -P 3306 -u root -p123456 crazyant –skip-lock-tables –ignore-table=crazyant.table1 –ignore-table=crazyant.table2 –ignore-table=crazyant.table3 –ignore-table=crazyant.table4 > crazyant.sql

使用mysqldump命令只导出1个表或指定表的方法

也可以在mysqldump的database后面跟上要1个要导出的表,来只导出单个表。

mysqldump -h 127.0.0.1 -P 3306 -u root -p123456 –skip-lock-tables crazyant  table_a >dump_table_a.sql

也可以写上多个要导出的表,以空格分开它们即可:

mysqldump -h 127.0.0.1 -P 3306 -u root -p123456 –skip-lock-tables crazyant table_a table_b >dump_two_tables.sql

可以看出,只要用“database_name tablename1 tablename2 tablename3”的方式写出导出数据库的表即可。

使用mysqldump命令只导出表结构的方法

mysqldump还有一个选项能够只导出表的结构,而不导出表的内容:

-d, –no-data       No row information

使用实例:

mysqldump –opt -d  -h 127.0.0.1 -P 3306 -u root -p123456 crazyant –skip-lock-tables

使用source命令导入数据

使用mysql的source命令,可以将mysqldump的导出结果直接Load到数据表里面;

使用方法如下:

> mysql -h 127.0.1.1 -P 3306 -u root -p123456

mysql>  use crazyant;

mysql> source /home/crazyant.net/mysqldump_crazyant.sql

这样就完成了导入,如果是全量导入,有时候有必要使用truncate table的方式先将数据清空;

总结

使用MySQL自带的mysqldump和source命令,能够很方便的将数据库的数据导出,也能够将导出的数据导入到库中;

mysqldump官方文档地址:http://dev.mysql.com/doc/refman/5.1/en/mysqldump.html

转载请注明来源:http://crazyant.net/mysql/1355.html

MySQL一条语句更新多个表的方法

MySQL本身是支持一条update语句更新多个表的,有时候这是非常有用的一个特性。

Multiple-table syntax

UPDATE [LOW_PRIORITY] [IGNORE] table_references
    SET col_name1={expr1|DEFAULT} [, col_name2={expr2|DEFAULT}] …
    [WHERE where_condition]</pre>

于是继续找table_references说明; 

table_references:
    escaped_table_reference [, escaped_table_reference] …

escaped_table_reference:
    table_reference
  | { OJ table_reference }

table_reference:
    table_factor
  | join_table

table_factor:
    tbl_name [[AS] alias] [index_hint]
  | table_subquery [AS] alias
  | ( table_references )

可以看到,update的关键词可以写多个表,每个表也可以是个子查询、也可以是join语句。

一个小尝试

在我的另一篇文章中,我已经用到了该语法:

UPDATE table_a,table_b SET table_a.age=table_b.age WHERE table_a.id=table_b.id;

该语句中的table_b表也可以换成子查询、join子句,比如:

UPDATE table_a,(SELECT id,age FROM table_b) AS tb SET table_a.age=tb.age WHERE table_a.id=tb.id;

mysql update官方文档:http://dev.mysql.com/doc/refman/5.0/en/update.html

转载请注明来源:链接

mysql根据A表更新B表的方法

最近遇到一个需求:mysql中A表和B表都有(id, age)字段,现在想读取B表的age字段,将其update到A表对应ID的age字段中去,我直接想到了一种方案:用Python读取B表,获得{id:age}形式的数据,然后根据每个ID和age的值依次update A表。

两个表分别定义和数据如下:

A表定义:

Field Type Comment
id int(11)  
name varchar(20)  
age int(11)  

数据:

1,name1,0
2,name2,0
3,name3,0
4,name4,0
5,name5,0

B表定义

Field Type Comment
id int(11)  
age int(11)  

数据:

1,11
2,21
3,31
4,41
5,51

python代码来实现

# -*- encoding:utf8 -*-
'''
@author: crazyant.net
读取B表的(id, age)数据,然后依次更新A表;
'''
from common.DBUtil import DB

dbUtil = DB('127.0.0.1',3306,'root','','test')

rs = dbUtil.query("SELECT id,age FROM table_b")

for row in rs:
    (idv,age)=row
    print (idv,age)
    update_sql="update table_a set age='%s' where id='%s';"%(age,idv)
    print update_sql
    dbUtil.update(update_sql)

print 'over'

其实一条SQL语句就可以搞定

 

看了看代码,实在是简单,于是网上搜了一下mysql能不能根据一个表更新另一个表,结果发现update本身就支持多个表更新的功能。

UPDATE table_a,table_b SET table_a.age=table_b.age WHERE table_a.id=table_b.id;

用python代码就显得是大炮打蚊子多次一举了。

 

转载请注明来源:链接