Spark使用Java开发遇到的那些类型错误

Spark使用Java开发其实比较方便的,JAVA8的lambda表达式使得编写体验并不比Scala差很多,但是因为Spark本身使用Scala实现,导致使用Java开发的时候,也遇到不少的类型匹配问题。

本文列举出自己在工作开发中遇到的一些问题,供大家参考:

WrappedArray和Vector

报错信息为:Caused by: java.lang.ClassCastException: scala.collection.mutable.WrappedArray$ofRef cannot be cast to org.apache.spark.ml.linalg.Vector

当使用DataFrame打印Schema的时候,是这样的输出:

 |-- tag_weights: array (nullable = true)
 |    |-- element: double (containsNull = true)
 |-- word_sims: array (nullable = true)
 |    |-- element: double (containsNull = true)

 

这时候如果Java用Vector接收,就会报这个错误,JAVA代码为:

spark.udf().register(
                "computeWeightSim",
                new UDF2<Vector, Vector, Double>() {
                    @Override
                    public Double call(Vector tag_weights, Vector word_sims) throws Exception {

解决办法是使用WrappedArray<Long>来接收,这是个scala的类型,可以用Iterator做遍历:

scala.collection.Iterator<Long> it1 = view_qipuids.iterator();
scala.collection.Iterator<Long> it2 = view_cnts.iterator();

Map<Long, Long> viewMap = new HashMap<>();
while (it1.hasNext() && it2.hasNext()) {
    viewMap.put(it1.next(), it2.next());
}

或者可以zip两个iterator进行计算:

new UDF2<WrappedArray<Double>, WrappedArray<Double>, Double>() {
    /**
     * 计算加权权重
     * @param tag_weights 加权
     * @param word_sims 计算结果目标
     * @return 加权权重
     * @throws Exception
     */
    @Override
    public Double call(WrappedArray<Double> tag_weights, WrappedArray<Double> word_sims) throws Exception {
        scala.collection.Iterator<Double> tag_weightsIter = tag_weights.iterator();
        scala.collection.Iterator<Double> word_simsIter = word_sims.iterator();

        scala.collection.Iterator<Tuple2<Double, Double>> zipIterator = tag_weightsIter.zip(word_simsIter);

        double totalWeight = 0.0;
        double fenziWeight = 0.0;
        while (zipIterator.hasNext()) {
            Tuple2<Double, Double> iterTuple = zipIterator.next();
            totalWeight += iterTuple._1;
            fenziWeight += iterTuple._1 * iterTuple._2;
        }

        if (totalWeight == 0.0) {
            return 0.0;
        } else {
            return fenziWeight / totalWeight;
        }
    }
}

 

详细内容见scala的文档:https://docs.scala-lang.org/overviews/collections/iterators.html

 

Spark使用word2vec训练item2vec实现内容相关推荐

之前使用spark als训练协同过滤,然后导出itemvectors做相似度计算,后来学到了可以用word2vec实现item2vec的训练效果貌似更好,试了一下果然不错;

spark版本:2.3.1,开发语言为JAVA

几大步骤

  1. 读取查看、点击、播放等行为数据,我用的是播放数据;
  2. 数据整理成(userid, itemid, playcnt)的形式,这个数据可能是聚合N天得到的;
  3. 过滤掉playcnt为小于3的数据,我把这些过滤掉,觉得这个数据没有贡献;
  4. 按照userid聚合,得到(userid, list(itemid))的形式;
  5. 训练word2vec;
  6. 导出model.vectors(),里面包括word和对应的向量vector,其中word其实就是itemid
  7. crossjoin计算两两相似度,取相似度TOP N;
  8. 将结果存入mysql,后续可以加载到REDIS实现实时相似推荐;

代码实现

读取播放数据:

Dataset<Row> playDatas = spark.sql(
        "select user_id, item_id, play_cnt " +
                "from hive_play_table group by user_id, item_id");

 

做数据按userid聚合:

playDatas = playDatas
        // 删除掉只播放3次以下的数据
        .filter("play_cnt>2")
        // 按userid聚合
        .groupBy("user_id")
        .agg(collect_list("item_id").as("item_ids"))
        // 至少操作过2个元素
        .where(size(col("item_ids")).geq(2));

 

训练word2vec:

Word2Vec word2Vec = new Word2Vec()
        .setInputCol("item_ids")
        .setOutputCol("word2vec_result")
        .setVectorSize(50)
        .setMinCount(0)
        .setMaxIter(50)
        .setSeed(123);

Word2VecModel word2VecModel = word2Vec.fit(playDatas);

 

实现df的cross join:

Dataset<Row> vectorsA = word2VecModel
        .getVectors()
        .select(
                col("word").as("itemIdA"),
                col("vector").as("vectorA"));

Dataset<Row> vectorsB = word2VecModel
        .getVectors()
        .select(
                col("word").as("itemIdB"),
                col("vector").as("vectorB"));

// self cross join
Dataset<Row> crossDatas = vectorsA.crossJoin(vectorsB);

 

注册余弦相似度计算函数:

spark.udf().register(
        "vectorCosinSim",
        new UDF2<Vector, Vector, Double>() {
            @Override
            public Double call(Vector vectora, Vector vectorb) throws Exception {
                return SimilarityUtils.cosineSimilarity(vectora, vectorb);
            }
        },
        DataTypes.DoubleType
);

 

其中调用的余弦相似度计算函数,使用JAVA实现:

public static double cosineSimilarity(Vector featuresLeft, Vector featuresRight) {
    double[] dataLeft = featuresLeft.toArray();
    List<Float> lista = new ArrayList<>();
    if (dataLeft.length > 0) {
        for (double d : dataLeft) {
            lista.add((float) d);
        }
    }

    double[] dataRight = featuresRight.toArray();
    List<Float> listb = new ArrayList<>();
    if (dataRight.length > 0) {
        for (double d : dataRight) {
            listb.add((float) d);
        }
    }

    return cosineSimilarity(lista, listb);
}

 

实现相似度计算,并过滤掉自身和自身的计算:

crossDatas = crossDatas
        .withColumn(
                "cosineSimilarity", callUDF(
                        "vectorCosinSim", col("vectorA"), col("vectorB")))
        .select("itemIdA", "itemIdB", "cosineSimilarity")
        .filter(col("itemIdA").notEqual(col("itemIdB")));

 

使用 spark的Window,提取每个group的topn:

// 按照相似度倒序排列取TOP 300
WindowSpec windowSpec = Window.partitionBy("itemIdA").orderBy(col("cosineSimilarity").desc());
crossDatas = crossDatas
        .withColumn("simRank", rank().over(windowSpec))
        .where(col("simRank").leq(200));

 

将数据聚合成每个Item的推荐列表的形式:

crossDatas = crossDatas
        .groupBy("itemIdA")
        .agg(
                collect_list("cosineSimilarity").as("columnSims"),
                collect_list("itemIdB").as("itemIds")
        ).select(
                col("itemIdA").as("item_id").cast(DataTypes.LongType),
                col("columnSims").as("column_sims").cast(DataTypes.StringType),
                col("itemIds").as("item_ids").cast(DataTypes.StringType)
        );

 

将数据覆盖写入MySQL:

crossDatas.write().mode(SaveMode.Overwrite).jdbc(
        MysqlConfig.ONLINE_MYSQL_MASTER_URL,
        "item2vec_sims",
        MysqlConfig.getOnlineProperties()
);

 

在数据库中,我们根据item_id,提取到item_ids,可以用于直接的推荐;其中column_sims也记录了对应的相似度权重,如果需要加权的话也可以直接提取;

欢迎大家关注我的爱奇艺号,学习Pyton大数据人工智能技术,地址

本文地址:http://www.crazyant.net/2447.html,转载请注明来源。

 

 

 

Java和Python使用Grpc访问Tensorflow的Serving代码

发现网上大量的代码都是mnist,我自己反正不是搞图像处理的,所以这个例子我怎么都不想搞;

wide&deep这种,包含各种特征的模型,才是我的需要,iris也是从文本训练模型,所以非常简单;

本文给出Python和Java访问Tensorflow的Serving代码。

Java版本使用Grpc访问Tensorflow的Serving代码

package io.github.qf6101.tensorflowserving;
 
import com.google.protobuf.ByteString;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.netty.NegotiationType;
import io.grpc.netty.NettyChannelBuilder;
import org.tensorflow.example.*;
import org.tensorflow.framework.DataType;
import org.tensorflow.framework.TensorProto;
import org.tensorflow.framework.TensorShapeProto;
import tensorflow.serving.Model;
import tensorflow.serving.Predict;
import tensorflow.serving.PredictionServiceGrpc;
 
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
 
/**
 * 参考:https://www.jianshu.com/p/d82107165119
 * 参考:https://github.com/grpc/grpc-java
 */
public class PssIrisGrpcClient {
 
    public static Example createExample() {
        Features.Builder featuresBuilder = Features.newBuilder();
 
        Map<String, Float> dataMap = new HashMap<String, Float>();
        dataMap.put("SepalLength", 5.1f);
        dataMap.put("SepalWidth", 3.3f);
        dataMap.put("PetalLength", 1.7f);
        dataMap.put("PetalWidth", 0.5f);
 
        Map<String, Feature> featuresMap = mapToFeatureMap(dataMap);
        featuresBuilder.putAllFeature(featuresMap);
 
        Features features = featuresBuilder.build();
        Example.Builder exampleBuilder = Example.newBuilder();
        exampleBuilder.setFeatures(features);
        return exampleBuilder.build();
    }
 
    private static Map<String, Feature> mapToFeatureMap(Map<String, Float> dataMap) {
        Map<String, Feature> resultMap = new HashMap<String, Feature>();
        for (String key : dataMap.keySet()) {
            // // data1 = {"SepalLength":5.1,"SepalWidth":3.3,"PetalLength":1.7,"PetalWidth":0.5}
            FloatList floatList = FloatList.newBuilder().addValue(dataMap.get(key)).build();
            Feature feature = Feature.newBuilder().setFloatList(floatList).build();
            resultMap.put(key, feature);
        }
        return resultMap;
    }
 
    public static void main(String[] args) {
        String host = "127.0.0.1";
        int port = 8888;
 
        ManagedChannel channel = ManagedChannelBuilder.forAddress(host, port)
                // Channels are secure by default (via SSL/TLS). For the example we disable TLS to avoid
                // needing certificates.
                .usePlaintext()
                .build();
        PredictionServiceGrpc.PredictionServiceBlockingStub blockingStub = PredictionServiceGrpc.newBlockingStub(channel);
 
        com.google.protobuf.Int64Value version = com.google.protobuf.Int64Value.newBuilder()
                .setValue(1)
                .build();
 
        Model.ModelSpec modelSpec = Model.ModelSpec.newBuilder()
                .setName("iris")
                .setVersion(version)
                .setSignatureName("classification")
                .build();
 
        List<ByteString> exampleList = new ArrayList<ByteString>();
        exampleList.add(createExample().toByteString());
 
        TensorShapeProto.Dim featureDim = TensorShapeProto.Dim.newBuilder().setSize(exampleList.size()).build();
        TensorShapeProto shapeProto = TensorShapeProto.newBuilder().addDim(featureDim).build();
        org.tensorflow.framework.TensorProto tensorProto = TensorProto.newBuilder().addAllStringVal(exampleList).setDtype(DataType.DT_STRING).setTensorShape(shapeProto).build();
 
        Predict.PredictRequest request = Predict.PredictRequest.newBuilder()
                .setModelSpec(modelSpec)
                .putInputs("inputs", tensorProto)
                .build();
        tensorflow.serving.Predict.PredictResponse response = blockingStub.predict(request);
        System.out.println(response);
 
        channel.shutdown();
    }
}

需要增加如下maven依赖:

        <!-- https://mvnrepository.com/artifact/org.tensorflow/tensorflow -->
        <dependency>
            <groupId>org.tensorflow</groupId>
            <artifactId>tensorflow</artifactId>
            <version>1.12.0</version>
        </dependency>
 
        <!-- https://mvnrepository.com/artifact/io.grpc/grpc-netty -->
        <dependency>
            <groupId>io.grpc</groupId>
            <artifactId>grpc-netty</artifactId>
            <version>1.20.0</version>
        </dependency>
 
        <!-- https://mvnrepository.com/artifact/io.grpc/grpc-protobuf -->
        <dependency>
            <groupId>io.grpc</groupId>
            <artifactId>grpc-protobuf</artifactId>
            <version>1.20.0</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/io.grpc/grpc-stub -->
        <dependency>
            <groupId>io.grpc</groupId>
            <artifactId>grpc-stub</artifactId>
            <version>1.20.0</version>
        </dependency>

输出结果:

outputs {
  key: "scores"
  value {
    dtype: DT_FLOAT
    tensor_shape {
      dim {
        size: 1
      }
      dim {
        size: 3
      }
    }
    float_val: 0.9997806
    float_val: 2.1938368E-4
    float_val: 1.382611E-9
  }
}
outputs {
  key: "classes"
  value {
    dtype: DT_STRING
    tensor_shape {
      dim {
        size: 1
      }
      dim {
        size: 3
      }
    }
    string_val: "0"
    string_val: "1"
    string_val: "2"
  }
}

Python版本使用Grpc访问Tensorflow的Serving代码

# 创建 gRPC 连接
import pandas as pd
from grpc.beta import implementations
import tensorflow as tf
from tensorflow_serving.apis import prediction_service_pb2, classification_pb2
 
#channel = implementations.insecure_channel('127.0.0.1', 8500):8888
channel = implementations.insecure_channel('127.0.0.1', 8888)
stub = prediction_service_pb2.beta_create_PredictionService_stub(channel)
 
def _create_feature(v):
    return tf.train.Feature(float_list=tf.train.FloatList(value=[v]))
 
data1 = {"SepalLength":5.1,"SepalWidth":3.3,"PetalLength":1.7,"PetalWidth":0.5}
features1 = {k: _create_feature(v) for k, v in data1.items()}
example1 = tf.train.Example(features=tf.train.Features(feature=features1))
 
 
data2 = {"SepalLength":1.1,"SepalWidth":1.3,"PetalLength":1.7,"PetalWidth":0.5}
features2 = {k: _create_feature(v) for k, v in data2.items()}
example2 = tf.train.Example(features=tf.train.Features(feature=features2))
 
# 获取测试数据集,并转换成 Example 实例。
examples = [example1, example2]
 
# 准备 RPC 请求,指定模型名称。
request = classification_pb2.ClassificationRequest()
request.model_spec.name = 'iris'
request.input.example_list.examples.extend(examples)
 
# 获取结果
response = stub.Classify(request, 10.0)
print(response)

Python代码看起来简单不少,但是我们的线上服务都是Java,所以不好集成的,只能做一些离线的批量预测;

输出如下:

result {
  classifications {
    classes {
      label: "0"
      score: 0.9997805953025818
    }
    classes {
      label: "1"
      score: 0.00021938368445262313
    }
    classes {
      label: "2"
      score: 1.382611025668723e-09
    }
  }
  classifications {
    classes {
      label: "0"
      score: 0.0736534595489502
    }
    classes {
      label: "1"
      score: 0.8393719792366028
    }
    classes {
      label: "2"
      score: 0.08697459846735
    }
  }
}
model_spec {
  name: "iris"
  version {
    value: 1
  }
  signature_name: "serving_default"
}

个人其实非常喜欢HTTP+JSON接口,完全不用搞这么多grpc这些麻烦的东西,尤其Java的grpc,遇到好多问题好崩溃;

不过号称grpc比http性能好不少,线上只能用grpc。

Spark使用JAVA编写自定义函数修改DataFrame

本文的代码涉及几个知识点,都是比较有用:

1、Spark用JAVA编写代码的方式;

2、Spark读取MySQL数据表,并且使用的是自定义SQL的方式,默认会读取整个表的;

3、Spark使用sql.functions的原有方法,给dataframe新增列、变更列;

4、Spark使用udf的自定义函数,给dataframe新增列、变更列;

/**
 * spark直接读取mysql
 */
private static Dataset<Row> queryMySQLData(SparkSession spark) {
    Properties properties = new Properties();
    properties.put("user", "root");
    properties.put("password", "12345678");
    properties.put("driver", "com.mysql.jdbc.Driver");
    // 可以写SQL语句查询数据结果
    return spark.read().jdbc(
            "jdbc:mysql://127.0.0.1:3306/test"
            , "(select id, name from tb_data) tsub",
            properties);
}

整个函数使用spark.read().jdbc读取mysql数据表,配置了mysql的user、passpord、driver,jdbcurl,以及可以通过sql语句执行数据查询,sql语句这里在spark源文档是table name,如果只设置table name,则会读取整个表,可以使用(select id, name from tb_data) tsub的方式读取SQL结果,注意的是这里必须给SQL语句设定一个标的别名。

以下是几种给dataframe添加新列、修改原有列的方法

方法1:使用functions中的函数,有一些局限性

// 方法1:使用functions中的函数,有一些局限性
inputData.withColumn("name_length_method1", functions.length(inputData.col("name")));

使用的是sql.functions里面的方法,里面支持了大部分的size、length等等方法,不过还是不够灵活,因为不支持就是不支持;

方法2:自定义注册udf,可以用JAVA代码写处理

可以先用spark.udf().register注册方法,然后使用functions.callUDF进行调用,其中自定义方法需要实现UDF1~UDF20的接口,分别代表传入不同的入参列:

// 方法2:自定义注册udf,可以用JAVA代码写处理
spark.udf().register(
        "getLength",
        new UDF1<String, Integer>() {
            @Override
            public Integer call(String s) throws Exception {
                return s.length();
            }
        },
        DataTypes.IntegerType);

inputData = inputData.withColumn(
        "name_length_method2",
        functions.callUDF("getLength",
                inputData.col("name"))
);

// 方法2.1:可以写UDF2~UDF20,就是把输入字段变成多个
spark.udf().register(
        "getLength2",
        new UDF2<Long, String, Long>() {

            @Override
            public Long call(Long aLong, String s) throws Exception {
                return aLong + s.length();
            }
        },
        DataTypes.LongType);

inputData = inputData.withColumn(
        "name_length_method3",
        functions.callUDF(
                "getLength2",
                inputData.col("id"),
                inputData.col("name"))
);

inputData.show(20, false);

代码地址见:github地址

tb_data的mysql表数据读取后的原始dataframe的schema:

root
 |-- id: long (nullable = true)
 |-- name: string (nullable = true)

数据:

+---+-----------+
|id |name       |
+---+-----------+
|1  |name1      |
|2  |name22     |
|3  |name333    |
|4  |name4444   |
|5  |name55555  |
|6  |name666666 |
|7  |name7777777|
+---+-----------+

最终计算之后的数据输出:

+---+-----------+-------------------+-------------------+-------------------+
|id |name       |name_length_method1|name_length_method2|name_length_method3|
+---+-----------+-------------------+-------------------+-------------------+
|1  |name1      |5                  |5                  |6                  |
|2  |name22     |6                  |6                  |8                  |
|3  |name333    |7                  |7                  |10                 |
|4  |name4444   |8                  |8                  |12                 |
|5  |name55555  |9                  |9                  |14                 |
|6  |name666666 |10                 |10                 |16                 |
|7  |name7777777|11                 |11                 |18                 |
+---+-----------+-------------------+-------------------+-------------------+

 

快速找到Tomcat中最耗CPU的线程

1、找出TOMCAT的JVM的进程ID

[work@112542000 ~]$ jps                                                                                                                                         
290 Bootstrap                                                                                                                                                   
61213 Jps                                                                                                                                                       

2、查看该进程中,最耗费CPU的线程

[work@112542000 ~]$ ps -mp 290 -o THREAD,tid,time | sort -k2 -r | head -n 20                                                                                    
USER     %CPU PRI SCNT WCHAN  USER SYSTEM   TID     TIME                                                                                                        
work     15.3   -    - -         -      -     - 03:11:58                                                                                                        
work      1.2  19    - -         -      -   326 00:15:45                                                                                                        
work      0.2  19    - -         -      -   872 00:03:44                                                                                                        
work      0.1  19    - -         -      -   992 00:01:44                                                                                                        
work      0.1  19    - -         -      -   972 00:01:16                                                                                                        
work      0.1  19    - -         -      -   870 00:01:19                                                                                                        
work      0.1  19    - -         -      -   869 00:01:34                                                                                                        
work      0.0  19    - -         -      -  9993 00:00:00                                                                                                        
work      0.0  19    - -         -      -   997 00:00:06                                                                                                        
work      0.0  19    - -         -      -  9969 00:00:00                                                                                                        
work      0.0  19    - -         -      -  9968 00:00:00                                                                                                        
work      0.0  19    - -         -      -   996 00:00:34                                                                                                        
work      0.0  19    - -         -      -  9960 00:00:00                                                                                                        
work      0.0  19    - -         -      -   995 00:00:00                                                                                                        
work      0.0  19    - -         -      -  9944 00:00:00                                                                                                        
work      0.0  19    - -         -      -   994 00:00:31                                                                                                        
work      0.0  19    - -         -      -  9936 00:00:00                                                                                                        
work      0.0  19    - -         -      -  9934 00:00:26                                                                                                        
work      0.0  19    - -         -      -  9933 00:00:21                                                                                                        

3、以TID==326为例,查看该线程的堆栈

首先,将十进制的326转换成十六进制,可以在线转换:

http://tool.oschina.net/hexconvert

结果等于146

使用jstack查询该线程堆栈:

[work@112542000 ~]$ jstack 290 | grep "0x160" -A 10                                                                                                             
"Timer-2955" daemon prio=10 tid=0x00007f39d3ecd000 nid=0x1602 in Object.wait() [0x00007f38fb273000]                                                             
   java.lang.Thread.State: TIMED_WAITING (on object monitor)                                                                                                    
        at java.lang.Object.wait(Native Method)                                                                                                                 
        at java.util.TimerThread.mainLoop(Timer.java:552)                                                                                                       
        - locked <0x00000007c2a0fc40> (a java.util.TaskQueue)                                                                                                   
        at java.util.TimerThread.run(Timer.java:505)                                                                                                            
                                                                                                                                                                
"Timer-2954" daemon prio=10 tid=0x00007f392481e000 nid=0x15e4 in Object.wait() [0x00007f38fbe7f000]                                                             
   java.lang.Thread.State: TIMED_WAITING (on object monitor)                                                                                                    
        at java.lang.Object.wait(Native Method)                                                                                                                 
        at java.util.TimerThread.mainLoop(Timer.java:552)                                                                                                       
--                                                                                                                                                              
"Timer-2132" daemon prio=10 tid=0x00007f39d367d000 nid=0x1603 in Object.wait() [0x00007f3933f46000]                                                             
   java.lang.Thread.State: TIMED_WAITING (on object monitor)                                                                                                    
        at java.lang.Object.wait(Native Method)                                                                                                                 
        at java.util.TimerThread.mainLoop(Timer.java:552)                                                                                                       
        - locked <0x00000007b44ef3c8> (a java.util.TaskQueue)                                                                                                   
        at java.util.TimerThread.run(Timer.java:505)                                                                                                            
                                                                                                                                                                
"Timer-2131" daemon prio=10 tid=0x00007f39d3b61800 nid=0x15e6 in Object.wait() [0x00007f3934d54000]                                                             
   java.lang.Thread.State: TIMED_WAITING (on object monitor)                                                                                                    
        at java.lang.Object.wait(Native Method)                                                                                                                 
        at java.util.TimerThread.mainLoop(Timer.java:552)                                                                                                       
--                                                                                                                                                              
"Timer-450" daemon prio=10 tid=0x00007f39d0728800 nid=0x160 in Object.wait() [0x00007f39b1312000]                                                               
   java.lang.Thread.State: TIMED_WAITING (on object monitor)                                                                                                    
        at java.lang.Object.wait(Native Method)                                                                                                                 
        at java.util.TimerThread.mainLoop(Timer.java:552)                                                                                                       
        - locked <0x00000007bddf8788> (a java.util.TaskQueue)                                                                                                   
        at java.util.TimerThread.run(Timer.java:505)                                                                                                            
                                                                                                                                                                
"Timer-449" daemon prio=10 tid=0x00007f39ec01f000 nid=0xffc7 in Object.wait() [0x00007f39af5f5000]                                                              
   java.lang.Thread.State: TIMED_WAITING (on object monitor)                                                                                                    
        at java.lang.Object.wait(Native Method)                                                                                                                 
        at java.util.TimerThread.mainLoop(Timer.java:552)                                                                                                       

 

参考资料:

http://scau-fly.iteye.com/blog/1884606

http://www.blogjava.net/hankchen/archive/2012/05/09/377735.html

 

本文地址:http://crazyant.net/2145.html

Java线程池ThreadPoolExecutor详解

1、线程池的工作原理?

  1. 线程池刚创建时,里面没有一个线程。任务队列是作为参数传进来的。不过,就算队列里面有任务,线程池也不会马上执行它们。
  2. 当调用 execute() 方法添加一个任务时,线程池会做如下判断:
    1. 如果正在运行的线程数量小于 corePoolSize,那么马上创建线程运行这个任务;
    2. 如果正在运行的线程数量大于或等于 corePoolSize,那么将这个任务放入队列。
    3. 如果这时候队列满了,而且正在运行的线程数量小于 maximumPoolSize,那么还是要创建线程运行这个任务;
    4. 如果队列满了,而且正在运行的线程数量大于或等于 maximumPoolSize,那么线程池会抛出异常,告诉调用者“我不能再接受任务了”。
  3. 当一个线程完成任务时,它会从队列中取下一个任务来执行。
  4. 当一个线程无事可做,超过一定的时间(keepAliveTime)时,线程池会判断,如果当前运行的线程数大于 corePoolSize,那么这个线程就被停掉。所以线程池的所有任务完成后,它最终会收缩到 corePoolSize 的大小。

这样的过程说明,并不是先加入任务就一定会先执行。假设队列大小为 10,corePoolSize 为 3,maximumPoolSize 为 6,那么当加入 20 个任务时,执行的顺序就是这样的:首先执行任务 1、2、3,然后任务 4~13 被放入队列。这时候队列满了,任务 14、15、16 会被马上执行,而任务 17~20 则会抛出异常。最终顺序是:1、2、3、14、15、16、4、5、6、7、8、9、10、11、12、13。

2、线程池有哪些配置项?

线程池可以使用java.util.concurrent.ThreadPoolExecutor来创建,在该类中包含最全参数的构造函数如下:

    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler)

相应的入参就是线程池可以配置的参数:

  • corePoolSize :核心池的大小,如果调用了prestartAllCoreThreads()或者prestartCoreThread()方法,会直接预先创建corePoolSize的线程,否则当有任务来之后,就会创建一个线程去执行任务,当线程池中的线程数目达到corePoolSize后,就会把到达的任务放到缓存队列当中;这样做的好处是,如果任务量很小,那么甚至就不需要缓存任务,corePoolSize的线程就可以应对;
  • maximumPoolSize:线程池最大线程数,表示在线程池中最多能创建多少个线程,如果运行中的线程超过了这个数字,那么相当于线程池已满,新来的任务会使用RejectedExecutionHandler 进行处理;
  • keepAliveTime:表示线程没有任务执行时最多保持多久时间会终止,然后线程池的数目维持在corePoolSize 大小;
  • unit:参数keepAliveTime的时间单位;
  • workQueue:一个阻塞队列,用来存储等待执行的任务,如果当前对线程的需求超过了corePoolSize大小,才会放在这里;
  • threadFactory:线程工厂,主要用来创建线程,比如可以指定线程的名字;
  • handler:如果线程池已满,新的任务的处理方式

3、线程池的阻塞队列包含哪几种选择?

如果线程数超过了corePoolSize,则开始把线程先放到阻塞队列里,相当于生产者消费者的一个数据通道,有以下一些阻塞队列可供选择:

1. ArrayBlockingQueue
2. DelayQueue
3. LinkedBlockingQueue
4. PriorityBlockingQueue
5. SynchronousQueue

ArrayBlockingQueue是一个有边界的阻塞队列,它的内部实现是一个数组。有边界的意思是它的容量是有限的,我们必须在其初始化的时候指定它的容量大小,容量大小一旦指定就不可改变。

DelayQueue阻塞的是其内部元素,DelayQueue中的元素必须实现 java.util.concurrent.Delayed接口,该接口只有一个方法就是long getDelay(TimeUnit unit),返回值就是队列元素被释放前的保持时间,如果返回0或者一个负值,就意味着该元素已经到期需要被释放,此时DelayedQueue会通过其take()方法释放此对象,DelayQueue可应用于定时关闭连接、缓存对象,超时处理等各种场景;

LinkedBlockingQueue阻塞队列大小的配置是可选的,如果我们初始化时指定一个大小,它就是有边界的,如果不指定,它就是无边界的。说是无边界,其实是采用了默认大小为Integer.MAX_VALUE的容量 。它的内部实现是一个链表。

PriorityBlockingQueue是一个没有边界的队列,它的排序规则和 java.util.PriorityQueue一样。需要注意,PriorityBlockingQueue中允许插入null对象。所有插入PriorityBlockingQueue的对象必须实现 java.lang.Comparable接口,队列优先级的排序规则就是按照我们对这个接口的实现来定义的。

SynchronousQueue队列内部仅允许容纳一个元素。当一个线程插入一个元素后会被阻塞,除非这个元素被另一个线程消费。

使用的最多的应该是LinkedBlockingQueue,注意一般情况下要配置一下队列大小,设置成有界队列,否则JVM内存会被撑爆!

4、如果线程池已经满了可是还有新的任务提交怎么办?

线程池已满的定义,是指运行线程数==maximumPoolSize,并且workQueue是有界队列并且已满(如果是无界队列当然永远不会满);

这时候再提交任务怎么办呢?线程池会将任务传递给最后一个参数RejectedExecutionHandler来处理,比如打印报错日志、抛出异常、存储到Mysql/redis用于后续处理等等,线程池默认也提供了几种处理方式见第5条目;

5、有哪些饱和策略可以使用?

饱和策略指的就是线程池已满情况下任务的处理策略,默认有以下几种:

  • 在默认的 ThreadPoolExecutor.AbortPolicy 中,处理程序遭到拒绝将抛出运行时RejectedExecutionException。
  • 在 ThreadPoolExecutor.CallerRunsPolicy 中,线程调用运行该任务的execute 本身。此策略提供简单的反馈控制机制,能够减缓新任务的提交速度。
  • 在 ThreadPoolExecutor.DiscardPolicy 中,不能执行的任务将被删除。
  • 在 ThreadPoolExecutor.DiscardOldestPolicy 中,如果执行程序尚未关闭,则位于工作队列头部的任务将被删除,然后重试执行程序(如果再次失败,则重复此过程)

当然也可以自己实现处理策略类,继承RejectedExecutionHandler接口即可,该接口只有一个方法:

void rejectedExecution(Runnable r, ThreadPoolExecutor executor);

6、怎样优化线程池的配置?

如何合理配置线程池大小,仅供参考。

一般需要根据任务的类型来配置线程池大小:

如果是CPU密集型任务,就需要尽量压榨CPU,参考值可以设为 NCPU+1

如果是IO密集型任务,参考值可以设置为2*NCPU

当然,这只是一个参考值,具体的设置还需要根据实际情况进行调整,比如可以先将线程池大小设置为参考值,

再观察任务运行情况和系统负载、资源利用率来进行适当调整。

其中NCPU的指的是CPU的核心数,可以使用Runtime.getRuntime().availableProcessors()来获取;

参考文章:

本文地址:http://crazyant.net/2124.html,转载请注明来源。

Tomcat内存分析相关方法(jmap和mat)

Linux环境命令行

首先,根据进程命令,获取运行的tomcat的进程ID

ps aux | grep tomcat | grep java | grep bsc

在第二列可以看到进程ID

然后使用jmap可以查看内存占比:

/home/work/bsc/java/jdk-1.7-7u60/bin/jmap -heap 7840

输出的结果包括了内存各个部分的占比:

[work@xxxx.com logs]$ /home/work/bsc/java/jdk-1.7-7u60/bin/jmap -heap 10433
Attaching to process ID 10433, please wait...
Debugger attached successfully.
Server compiler detected.
JVM version is 24.60-b09

using parallel threads in the new generation.
using thread-local object allocation.
Concurrent Mark-Sweep GC

Heap Configuration:
   MinHeapFreeRatio = 40
   MaxHeapFreeRatio = 70
   MaxHeapSize      = 8388608000 (8000.0MB)
   NewSize          = 1048576000 (1000.0MB)
   MaxNewSize       = 1048576000 (1000.0MB)
   OldSize          = 5439488 (5.1875MB)
   NewRatio         = 2
   SurvivorRatio    = 8
   PermSize         = 134217728 (128.0MB)
   MaxPermSize      = 268435456 (256.0MB)
   G1HeapRegionSize = 0 (0.0MB)

Heap Usage:
New Generation (Eden + 1 Survivor Space):
   capacity = 943718400 (900.0MB)
   used     = 393328672 (375.1074523925781MB)
   free     = 550389728 (524.8925476074219MB)
   41.67860582139757% used
Eden Space:
   capacity = 838860800 (800.0MB)
   used     = 288471072 (275.1074523925781MB)
   free     = 550389728 (524.8925476074219MB)
   34.388431549072266% used
From Space:
   capacity = 104857600 (100.0MB)
   used     = 104857600 (100.0MB)
   free     = 0 (0.0MB)
   100.0% used
To Space:
   capacity = 104857600 (100.0MB)
   used     = 0 (0.0MB)
   free     = 104857600 (100.0MB)
   0.0% used
concurrent mark-sweep generation:
   capacity = 1288736768 (1229.03515625MB)
   used     = 706435104 (673.7090148925781MB)
   free     = 582301664 (555.3261413574219MB)
   54.816089797478334% used
Perm Generation:
   capacity = 134217728 (128.0MB)
   used     = 65034240 (62.021484375MB)
   free     = 69183488 (65.978515625MB)
   48.45428466796875% used

27139 interned Strings occupying 3055832 bytes.

如上可以清楚的看到内存配置信息、年轻代(包括eden、from、to)、老年代、永久代各自的内存信息和占比。

使用Eclipse的memory analysis tool工具进行详细分析

首先,使用命令将内存dump到文件:

/home/work/bsc/java/jdk-1.7-7u60/bin/jmap -dump:format=b,file=/home/work/tmp/bsc.bin 10433

MAT(Memory Analyzer Tool)工具是eclipse的一个插件,使用起来非常方便,尤其是在分析大内存的dump文件时,可以非常直观的看到各个对象在堆空间中所占用的内存大小、类实例数量、对象引用关系、利用OQL对象查询,以及可以很方便的找出对象GC Roots的相关信息,当然最吸引人的还是能够快速为开发人员生成内存泄露报表,方便定位问题和分析问题。

MAT工具的下载地址为: http://www.eclipse.org/mat/downloads.php

MAT插件的下载地址为: http://download.eclipse.org/mat/1.3/update-site/

mat_image

 

然后就可以查看内存中的数据,点击饼图,也可以看下占比最大的对象是什么,从而分析程序的行为。

本文地址:http://crazyant.net/1980.html

Log4j将不同Package的日志输出到不同的文件的方法

随着项目规模的越来越大,会不断的引入新的模块,不同的模块都会打印自己的日志,最后就造成日志根本没法查看,比如我自己的项目中,就存在以下这些日志:

  1. 接收外界消息的日志、对外发送消息的日志;
  2. 后台常驻线程的处理日志;
  3. 外部接口访问的参数、返回结果等接口日志;
  4. Service访问数据库产生的SQL日志;

这其中,消息日志和后台线程的日志数据量非常庞大,如果所有日志打印在一个文件中,使用tail -f log.log文件,会发现日志在快速的滚动,根本无法查看甚至定位某一个具体的SQL或者Service访问日志。

解决方法就是可以将不同的日志加以分类输出,这样相互的日志不影响,尤其重要的接口访问日志,能够很方便的定位和排查问题。

步骤1:在log4j.properties中配置

先贴一下我自己所有的log4j.properties配置:

log4j.rootLogger=INFO, console, file

log4j.appender.console=net.czt.log.AsyncConsoleAppender
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d [%t] %-5p crazyant-web %-17c{2} (%13F:%L) %X{USER_ID}|%X{USER_IP}|%X{SERVER_ADDRESS}|%X{SERVER_NAME}|%X{REQUEST_URI}|%X{SESSION_ID} - %m%n
log4j.appender.console.bufferSize=10000
log4j.appender.console.encoding=UTF-8

log4j.appender.file=org.apache.log4j.RollingFileAppender
log4j.appender.file.file=/home/work/apache-tomcat-6.0.39/logs/crazyant.log
log4j.appender.file.MaxBackupIndex=5
log4j.appender.file.MaxFileSize=1GB
log4j.appender.file.layout=org.apache.log4j.PatternLayout
log4j.appender.file.layout.ConversionPattern=[%-5p] crazyant-web %d{yyyy-MM-dd HH:mm:ss,SSS} %X{USER_ID}|%X{USER_IP}|%X{SERVER_ADDRESS}|%X{SERVER_NAME}|%X{REQUEST_URI}|%X{SESSION_ID} method:%l%n%m%n
log4j.appender.file.bufferSize=10000
log4j.appender.file.encoding=UTF-8

log4j.logger.net.czt.crazyant.msg=DEBUG, message
log4j.additivity.net.czt.crazyant.msg=false
log4j.appender.message=org.apache.log4j.RollingFileAppender
log4j.appender.message.File=/home/work/apache-tomcat-6.0.39/logs/crazyant_message.log
log4j.appender.message.Append=true
log4j.appender.message.MaxFileSize=1GB
log4j.appender.message.MaxBackupIndex=5
log4j.appender.message.layout=org.apache.log4j.PatternLayout
log4j.appender.message.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} [%-5p][%c{1}] [%t] - %m%n
log4j.appender.message.encoding=UTF-8

log4j.logger.net.czt.crazyant.async.service=DEBUG, async
log4j.additivity.net.czt.crazyant.async.service=false
log4j.appender.async=org.apache.log4j.RollingFileAppender
log4j.appender.async.File=/home/work/apache-tomcat-6.0.39/logs/crazyant_async.log
log4j.appender.async.Append=true
log4j.appender.async.MaxFileSize=1GB
log4j.appender.async.MaxBackupIndex=5
log4j.appender.async.layout=org.apache.log4j.PatternLayout
log4j.appender.async.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} [%-5p][%c{1}] [%t] - %m%n
log4j.appender.async.encoding=UTF-8

log4j.logger.net.czt.orm.mybatis.SqlMonitorManager=DEBUG, showsql
log4j.additivity.net.czt.orm.mybatis.SqlMonitorManager=false
log4j.logger.net.czt.transaction.interceptor.SmartTransactionInterceptor=DEBUG, showsql
log4j.additivity.net.czt.transaction.interceptor.SmartTransactionInterceptor=false
log4j.appender.showsql=org.apache.log4j.RollingFileAppender
log4j.appender.showsql.File=/home/work/apache-tomcat-6.0.39/logs/crazyant_sql.log
log4j.appender.showsql.Append=true
log4j.appender.showsql.MaxFileSize=1GB
log4j.appender.showsql.MaxBackupIndex=5
log4j.appender.showsql.layout=org.apache.log4j.PatternLayout
log4j.appender.showsql.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} [%-5p][%c{1}] [%t] - %m%n
log4j.appender.showsql.encoding=UTF-8

log4j.logger.net.czt.crazyant.service=DEBUG, service
log4j.additivity.net.czt.crazyant.service=false
log4j.appender.service=org.apache.log4j.RollingFileAppender
log4j.appender.service.File=/home/work/apache-tomcat-6.0.39/logs/crazyant_service.log
log4j.appender.service.Append=true
log4j.appender.service.MaxFileSize=1GB
log4j.appender.service.MaxBackupIndex=5
log4j.appender.service.layout=org.apache.log4j.PatternLayout
log4j.appender.service.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} [%-5p][%c{1}] [%t] - %m%n
log4j.appender.service.encoding=UTF-8

 

在配置文件的下方,可以方便的看到,我将message(消息)、async(后端线程)、showsql(数据库日志)、service(接口调用)分别输出到了不同的日志文件。

其中的一些解释:

log4j.rootLogger=INFO, console, file

log4j有一个rootLogger和普通Logger的概念,默认情况下我们只需要一个rootLogger,就是所有的日志只会输出到这一个日志文件中。

 

看一下普通Logger的配置(以接口日志service为例):

  • log4j.logger.net.czt.crazyant.service=DEBUG, service
    • 这句中的”net.czt.crazyant.service”,表示该普通logger日志配置生效的package的完全路径
    • 其中色service,表示该普通logger的名字
  • log4j.additivity.net.czt.crazyant.service=false
    • 其中的”net.czt.crazyant.service”,和上面的相同,表示该配置项针对的package
    • 该句配置的意思,是不要将该package的日志输出到rootLogger日志中,只输出到自己配置的日志就行了;
  • log4j.appender.service=org.apache.log4j.RollingFileAppender,以及该配置段下面的配置项
    • 这里的”service”字符串,和上面的第一个配置项的”service”相同,表示对该普通Logger的配置;
    • 下方的配置项和rootLogger相同,表示每天输出文件、编码UTF8、分片规则、每行的输出模式等等

我自己遇到的问题,是上面的log4j.properties配置好以后,发现各个日志文件创建了,但是里面都没有内容,这是为啥呢?来看下面第二个注意的地方;

步骤2、输出日志时需要设定日志对象对应的具体Class

什么意思呢?上面的配置项中,有一个”net.czt.crazyant.service”的package字符串,那么我们自己想一下,log4j是怎样将不同package中的logger日志输出到不同文件呢,想一下会有两种方法:

  1. 采用intercepter或者aop的方式,log4j自己检测日志输出,检测到日志产生于哪个package,就将其输出到对应文件中;
  2. 由用户传一个Class参数,log4j获取该Class对应的Package,以此为准,来定位不同的日志文件;

看一下代码,显然log4j用的是后一种简单直接的方式:

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

public class MyClassImpl implements MyClass {
    /**
     * loger
     */
    private static final Log logger = LogFactory.getLog(MyClassImpl.class);
    
    /**
     * my func
     */
    public void myfunc() {
        logger.info("call method myfunc.");
    }
}

在logger = LogFactory.getLog(MyClassImpl.class)中,传入了使用该logger的Class参数,而该Class被反射取到的package地址,就是log4j用来输出日志的package地址。

这种做法也有强大的地方,方便逻辑上的日志归类,比如很多代码不属于一个package,但是它们逻辑上属于一起的,举个例子,消息的处理不只是接口调用Service这个package,可能还会调用发送msg的操作,如果想把msg的package中一些日志也输出到Service,那么在这个msg的logger初始化的时候,传入一个Serivice的Class就行了。

或者对于某一类的所有日志来说,它们所有的logger对象,都来自封装好的单个对象实例即可,而这个单个对象实例传入的参数只有一个,用于标识这个逻辑归类即可。

总结

在Log4j.properties中,支持package或者具体class的日志单独输出,但是也需要代码中logger初始化的时候,能和日志配置中的package对应上。

 

原创文章,转载请注明地址:http://crazyant.net/1931.html

数据处理中提升性能的方法-引入并发但是避免同步

背景

只要存在数据库,就会有后台批量处理数据的需求,比如数据表备份、定期清理、数据替换、数据迁移,对于批量处理来说,往往会涉及大量的查询、过滤、归类、聚合计算,在批量脚本中直接查询数据库往往性能太低,甚至会因为一个大型的SQL导致数据库锁表出现线上事故,因此一般采用先导出到文件,在文件上计算然后再导入,比如:

1、使用mysql -e “select * from table” > output.txt的方式,执行SQL,将结果导出到文件中;

2、针对文件,使用各种方式进行聚合、过滤、替换等计算,最后产出成需要使用的格式;

3、发布产出的文件,或者使用load data命令导入到数据库;

由于只是一次性的批量查询数据库导出到文件,然后针对文件进行计算,而不是每次都查询数据库,大量节省了网络的IO耗费,从而提升处理的速度。

然而得到了导出的文件之后,如果文件过大,或者计算逻辑复杂比如大量的调用了耗费CPU的正则匹配、聚合计算,那么单线程的处理会耗费大量的时间,这时候就可以引入并发处理,使得机器的CPU、内存、IO、网络等资源全部充分利用起来,大幅度降低处理时间。

引入多线程,拆分输入文件为多个,每个小文件启动一个处理线程

HADOOP的MAP-REDUCE的做法,是先将文件split成小分片文件,然后针对每个分片做计算,最后将每个分片的结果聚合在一起,然而由于HADOOP的调度、集群稳定性等各种原因,对于MB大小级别的文件处理,会发现速度非常慢,有时候甚至比单机单线程处理速度还慢,将单机单线程改成多线程,往往会发现令人惊讶的效果提升。

直观的做法,是使用主线程读取输入的单个大文件,然后将读取的结果分配给子线程处理,然后主线程做整合,这种方式因为多线程共用了单个文件的IO,需要加入对文件的同步机制,最后会发现性能瓶颈在这单个文件的读取同步之上。

可以将大文件分片成小文件,然后每个文件分配给单个线程单独处理,避免线程间的资源同步,每个线程会享用单独的CPU核、内存单元、文件句柄,处理速度能达到最快。

使用这种方式,可以用以下的步骤进行:

1、使用SHELL,将输入文件拆分成预定线程数目的份数,存放到一个目录中;

2、以输入文件的目录路径作为参数,编程语言JAVA/PYTHON读取该目录的所有文件,对于每个文件启动一个处理线程,进行处理;

3、SHELL将输出目录的所有文件,使用cat file* > output_file的方式,得到最终的计算结果

#
# 将输入文件拆分成多个小文件,启动多线程进行处理,输出结果文件
#
function run_multi_task(){
	# 开启多个异步线程
	SPLITS_COUNT=20
	# 输入文件总数
	source_file_lines_count=`cat ${input_file} | wc -l`
	# 计算出拆分的文件个数
	split_file_lines_count=$(( $source_file_lines_count / $SPLITS_COUNT ))
	# 进行文件拆分
	split -l $split_file_lines_count -a 3 -d ${input_file} ${input_dir}/inputFile_
	
	# 执行JAVA程序
	$JAVA_CMD -classpath $jar_path "net.crazyant.BackTaskMain" "${input_dir}" "${output_dir}" "${output_err_dir}"
	
	# 合并文件
	cat ${output_dir}/* > ${output_file}
}

run_multi_task

这里注意,拆分文件的时候,不能使用split按照大小进行拆分,因为会把输入文件中的行截断;

对应的JAVA程序,则是通过读取文件夹中文件列表的方法,每个文件单独启动一个线程:

public class BackTaskMain {
    public static void main(String[] args) {
        String inputDataDir = args[1];
        String outputDataDir = args[2];
        String errDataDir = args[3];
        
        File inputDir = new File(inputDataDir);
        File[] inputFiles = inputDir.listFiles();
        
        // 记录开启的线程
        List<Thread> threads = new ArrayList<Thread>();
        for (File inputFile : inputFiles) {
            if (inputFile.getName().equals(".") || inputFile.getName().equals("..")) {
                continue;
            }
            
            // 针对每个inputFile,生成对应的outputFile和errFile
            String outputSrcLiceFpath = outputDataDir + "/" + inputFile.getName() + ".out";
            String errorOutputFpath = errDataDir + "/" + inputFile.getName() + ".err";
            
            // 创建Runnable
            BackRzInterface backRzInterface = new BackRzInterface();
            backRzInterface.setInputFilePath(inputFile.getAbsolutePath());
            backRzInterface.setOutputFilePath(outputSrcLiceFpath);
            backRzInterface.setErrorOutputFpath(errorOutputFpath);
            
            // 创建Thread,启动线程
            Thread singleRunThread = new Thread(backRzInterface);
            threads.add(singleRunThread);
            singleRunThread.start();
        }
        
        for (Thread thread : threads) {
            try {
                // 使用thread.join(),等待所有的线程执行完毕
                thread.join();
                System.out.println(thread.getName() + " has over");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        System.out.println("proccess all over");
    }
}

通过这种方式,将大文件拆分成小文件,启动多个线程,每个线程处理一个小文件,最终将每个小文件的结果聚合,就得到了最终产出,性能上却大幅提升。

若有依赖的资源,可以按线程先复制、拆分、克隆,防止依赖的资源成为性能瓶颈

在上面的代码中,BackRzInterface是每个线程启动时要使用的Runnable对象,可以看到用的是每次new的方式创建:

// 创建Runnable
BackRzInterface backRzInterface = new BackRzInterface();

这样每个处理线程依赖的BackRzInterface都是独立的,对这个Runnable代码的使用不会存在同步问题。

如果多线程处理中需要使用外部资源,最好想办法使得每个线程单独使用自己独占的资源,相互之间若不会存在冲突,可以实现最大化并发处理。

其他一些例子,比如:

  • 多线程用到了字典文件,那么方法是首先将字典文件复制多份,每个线程使用自己独占的字典,避免并发同步访问字典;
  • 多线程若需要统一ID发号,可以提前计算出每个输入文件的行数,然后依次生成第一个线程需要的ID范围、第二个线程需要的ID范围,这些不同的ID范围也可以分别生成不同的文件,这样每个线程会使用各自独立的ID资源,避免了多个线程单时刻访问单个ID发号服务,使得发号成为性能瓶颈的可能;
  • 多线程如果依赖相同的Service,如果可以每次new对象就每次new,如果Bean都是在Spring中管理,则将Service加上@Scope(“prototype”),或者将对象每次clone一下得到一个新对象,保证最终每个线程使用自己独占的对象。
  • 尽量使用函数式编程的思想,每个函数都不要产生副作用,不要修改入参,结果只能通过return返回,避免增加代码同步冲突的可能;

通过以上这些类似的方法,每次将可能需要同步访问的共享资源,通过复制、分片等手段得到不同份,每个线程单独访问自己那一份,避免同步访问,最终实现性能最优。

避免同步的终极方法:使用多进程进行实现资源隔离

如果将文件拆分成了多份,依赖的ID、词典等资源也相应提供了多份,但是发现代码中存在无法解决的代码级别同步,该怎么办呢?

相对于想尽办法解决代码中的同步问题来说,多线程和多进程之间的性能差别微乎其微,我们都知道线程会使用进程的资源,所以导致了线程之间存在竞争进程资源,但是对于进程来说,CPU、内存等硬件资源是完全隔离的,这时候将程序运行在多进程而不是多线程,反而能更好的提升性能。

对于一些支持多线程不好的语言,比如PHP,直接用这种多进程计算的方法,速度并不比支持多线程的JAVA、PYTHON语言差:

# 要拆分的文件数,也就是要启动的多进程数
SPLITS_COUNT=20

input_splits_dir="${input_dir}_splits"
output_splits_dir="${output_dir}_splits"
# 输入文件行数
source_file_lines_count=`cat ${input_file} | wc -l`
# 每个文件拆分的行数=总行数除以要拆分的文件个数(也就是对应进程的个数)
split_file_lines_count=$(( $source_file_lines_count / ${SPLITS_COUNT} ))
# 执行拆分,注意这里使用-l进行行级别拆分更好
split -l $split_file_lines_count -a 3 -d ${input_file} ${input_splits_dir}/inputfile_

process_idx=1
for fname in $(ls ${input_splits_dir}); do
	input_fpath=${input_splits_dir}/$fname
	ouput_fpath=${output_splits_dir}/$fname
	# 后台执行所有进程
	php "/php/main.php" "${input_fpath}" "${ouput_fpath}" &
	(( process_idx++ ))	
done

# 等待所有后台进程执行结束
wait

# 合并文件
cat $output_splits_dir/* > ${output_file}

上述代码中,使用shell的&符号,可以在后台同时启动多个进程,使用wait语法,可以实现多线程的Thread.join特性,等待所有的进程执行结束。

总结

对于输入文件的大小、计算的复杂度处于单机和集群计算之间的数据处理,使用并发处理最为合适,但是并发的同步处理却会降低多线程的性能,这时可以借助于输入文件复制拆分、依赖资源复制拆分切片等方法,实现每个线程处理自己的独占资源,从而最大化提升计算速度。而对于一些无法避免的代码同步冲突逻辑,可以退化为多进程处理数据,借助于SHELL的后台进程支持,实现进程级别的资源独占,最终大幅提升处理性能。

 

将Maven工程打包成可执行JAR包的方法

如果项目中有需要后台执行的任务,但是主要的逻辑都在Java代码中,那么我采用的方式是单独建立一个maven模块打成jar包,然后在linux后台通过命令执行Jar包的Main函数:

java -classpath backtask.jar "net.crazyant.RunWebService"

于是就有个前提,得将maven模块打包成jar包,原本很简单的事情,却出现了很多问题。

原始打包方式:使用maven-assembly-plugin

POM配置片段为:

<build>
    <plugins>
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-jar-plugin</artifactId>
        </plugin>   
        <plugin>
            <artifactId>maven-assembly-plugin</artifactId>
            <version>2.2.1</version>
            <configuration>
                <finalName>mdm-v3-backtasks</finalName>
                <descriptorRefs>
                    <descriptorRef>jar-with-dependencies</descriptorRef>
                </descriptorRefs>
            </configuration>
            <executions>
                <execution>
                    <phase>package</phase>
                    <goals>
                        <goal>single</goal>
                    </goals>
                </execution>
            </executions>
        </plugin>
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-resources-plugin</artifactId>
            <version>2.6</version>
            <configuration>
                <encoding>UTF-8</encoding>
            </configuration>
        </plugin> 
    </plugins>
</build>

这种打包方式,可以生成/target/mdm-v3-backtasks-1.0.0.2.jar的Jar包,在有些工程中是没有问题的,但是我的新工程打包后,却在运行时出现了如下问题:

Exception in thread "main" org.springframework.beans.factory.parsing.BeanDefinitionParsingException: Configuration problem: Unable to locate Spring NamespaceHandler for XML schema namespace [http://www.springframework.org/schema/context]
Offending resource: class path resource [applicationContext-backtasks.xml]

        at org.springframework.beans.factory.parsing.FailFastProblemReporter.error(FailFastProblemReporter.java:68)
        at org.springframework.beans.factory.parsing.ReaderContext.error(ReaderContext.java:85)
        at org.springframework.beans.factory.parsing.ReaderContext.error(ReaderContext.java:80)
        at org.springframework.beans.factory.xml.BeanDefinitionParserDelegate.error(BeanDefinitionParserDelegate.java:318)
        at org.springframework.beans.factory.xml.BeanDefinitionParserDelegate.parseCustomElement(BeanDefinitionParserDelegate.java:1435)
        at org.springframework.beans.factory.xml.BeanDefinitionParserDelegate.parseCustomElement(BeanDefinitionParserDelegate.java:1428)
        at org.springframework.beans.factory.xml.DefaultBeanDefinitionDocumentReader.parseBeanDefinitions(DefaultBeanDefinitionDocumentReader.java:195)
        at org.springframework.beans.factory.xml.DefaultBeanDefinitionDocumentReader.doRegisterBeanDefinitions(DefaultBeanDefinitionDocumentReader.java:139)
        at org.springframework.beans.factory.xml.DefaultBeanDefinitionDocumentReader.registerBeanDefinitions(DefaultBeanDefinitionDocumentReader.java:108)
        at org.springframework.beans.factory.xml.XmlBeanDefinitionReader.registerBeanDefinitions(XmlBeanDefinitionReader.java:493)
        at org.springframework.beans.factory.xml.XmlBeanDefinitionReader.doLoadBeanDefinitions(XmlBeanDefinitionReader.java:390)
        at org.springframework.beans.factory.xml.XmlBeanDefinitionReader.loadBeanDefinitions(XmlBeanDefinitionReader.java:334)
        at org.springframework.beans.factory.xml.XmlBeanDefinitionReader.loadBeanDefinitions(XmlBeanDefinitionReader.java:302)
        at org.springframework.beans.factory.support.AbstractBeanDefinitionReader.loadBeanDefinitions(AbstractBeanDefinitionReader.java:174)
        at org.springframework.beans.factory.support.AbstractBeanDefinitionReader.loadBeanDefinitions(AbstractBeanDefinitionReader.java:209)
        at org.springframework.beans.factory.support.AbstractBeanDefinitionReader.loadBeanDefinitions(AbstractBeanDefinitionReader.java:180)
        at org.springframework.beans.factory.support.AbstractBeanDefinitionReader.loadBeanDefinitions(AbstractBeanDefinitionReader.java:243)
        at org.springframework.context.support.AbstractXmlApplicationContext.loadBeanDefinitions(AbstractXmlApplicationContext.java:127)
        at org.springframework.context.support.AbstractXmlApplicationContext.loadBeanDefinitions(AbstractXmlApplicationContext.java:93)
        at org.springframework.context.support.AbstractRefreshableApplicationContext.refreshBeanFactory(AbstractRefreshableApplicationContext.java:130)
        at org.springframework.context.support.AbstractApplicationContext.obtainFreshBeanFactory(AbstractApplicationContext.java:537)
        at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:451)
        at org.springframework.context.support.ClassPathXmlApplicationContext.<init>(ClassPathXmlApplicationContext.java:139)
        at org.springframework.context.support.ClassPathXmlApplicationContext.<init>(ClassPathXmlApplicationContext.java:83)

网上搜了很多文章,发现首先大家都支持的,是maven-assembly-plugin插件,替换成maven-shade-plugin插件。

但是替换成maven-shade-plugin之后,仍然遇到了很多问题:

使用maven-shade-plugin逐步解决问题

使用maven-shade-plugin插件,并没有一下子就把问题解决了,也经过了很多步骤:

1、修改spring.xml的schema编写方式

原始的声明是这样的:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"
    xsi:schemaLocation="
            http://www.springframework.org/schema/beans
            http://www.springframework.org/schema/beans/spring-beans.xsd
            http://www.springframework.org/schema/context
            http://www.springframework.org/schema/context/spring-context.xsd">

作如下修改,把xsd的版本加上:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"
    xsi:schemaLocation="
            http://www.springframework.org/schema/beans
            http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
            http://www.springframework.org/schema/context
            http://www.springframework.org/schema/context/spring-context-3.0.xsd">

2、引入maven-shade-plugin插件的打包方式遇到的问题

将原来的maven-assembly-plugin替换成新的打包方式:

<build>
	<finalName>mdm-v3-backtasks</finalName>
	<plugins>
		<plugin>
			<groupId>org.apache.maven.plugins</groupId>
			<artifactId>maven-jar-plugin</artifactId>
		</plugin>
		<plugin>
			<groupId>org.apache.maven.plugins</groupId>
			<artifactId>maven-shade-plugin</artifactId>
			<version>2.4.2</version>
			<executions>
				<execution>
					<phase>package</phase>
					<goals>
						<goal>shade</goal>
					</goals>
				</execution>
			</executions>
		</plugin>
		<plugin>
			<groupId>org.apache.maven.plugins</groupId>
			<artifactId>maven-resources-plugin</artifactId>
			<version>2.6</version>
			<configuration>
				<encoding>UTF-8</encoding>
			</configuration>
		</plugin>
	</plugins>
</build>

然后就报了下面的错:

Exception in thread "main" java.lang.SecurityException: Invalid signature file digest for Manifest main attributes
        at sun.security.util.SignatureFileVerifier.processImpl(SignatureFileVerifier.java:286)
        at sun.security.util.SignatureFileVerifier.process(SignatureFileVerifier.java:239)
        at java.util.jar.JarVerifier.processEntry(JarVerifier.java:317)
        at java.util.jar.JarVerifier.update(JarVerifier.java:228)
        at java.util.jar.JarFile.initializeVerifier(JarFile.java:348)
        at java.util.jar.JarFile.getInputStream(JarFile.java:415)
        at sun.misc.URLClassPath$JarLoader$2.getInputStream(URLClassPath.java:775)
        at sun.misc.Resource.cachedInputStream(Resource.java:77)
        at sun.misc.Resource.getByteBuffer(Resource.java:160)
        at java.net.URLClassLoader.defineClass(URLClassLoader.java:436)
        at java.net.URLClassLoader.access$100(URLClassLoader.java:71)
        at java.net.URLClassLoader$1.run(URLClassLoader.java:361)
        at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
        at java.security.AccessController.doPrivileged(Native Method)
        at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
        at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
        at sun.launcher.LauncherHelper.checkAndLoadMain(LauncherHelper.java:482)

在http://zhentao-li.blogspot.com/2012/06/maven-shade-plugin-invalid-signature.html找到了解决方法:

You need to add the following to pom.xml:

        <configuration>
          <filters>
            <filter>
              <artifact>*:*</artifact>
              <excludes>
                <exclude>META-INF/*.SF</exclude>
                <exclude>META-INF/*.DSA</exclude>
                <exclude>META-INF/*.RSA</exclude>
              </excludes>
            </filter>
          </filters>
        </configuration>

于是加入以上的filter,新的POM内容为:

<build>
	<finalName>mdm-v3-backtasks</finalName>
	<plugins>
		<plugin>
			<groupId>org.apache.maven.plugins</groupId>
			<artifactId>maven-jar-plugin</artifactId>
		</plugin>
		<plugin>
			<groupId>org.apache.maven.plugins</groupId>
			<artifactId>maven-shade-plugin</artifactId>
			<version>2.4.2</version>
			<executions>
				<execution>
					<phase>package</phase>
					<goals>
						<goal>shade</goal>
					</goals>
					<configuration>
						<filters>
							<filter>
								<artifact>*:*</artifact>
								<excludes>
									<exclude>META-INF/*.SF</exclude>
									<exclude>META-INF/*.DSA</exclude>
									<exclude>META-INF/*.RSA</exclude>
								</excludes>
							</filter>
						</filters>
					</configuration>
				</execution>
			</executions>
		</plugin>
		<plugin>
			<groupId>org.apache.maven.plugins</groupId>
			<artifactId>maven-resources-plugin</artifactId>
			<version>2.6</version>
			<configuration>
				<encoding>UTF-8</encoding>
			</configuration>
		</plugin>
	</plugins>
</build>

修改后提交,不幸的是,又爆出了下面的错误:

Exception in thread "main" org.springframework.beans.factory.parsing.BeanDefinitionParsingException: Configuration problem: Unable to locate Spring NamespaceHandler for XML schema namespace [http://www.springframework.org/schema/context]
Offending resource: class path resource [applicationContext-backtasks.xml]

        at org.springframework.beans.factory.parsing.FailFastProblemReporter.error(FailFastProblemReporter.java:68)
        at org.springframework.beans.factory.parsing.ReaderContext.error(ReaderContext.java:85)
        at org.springframework.beans.factory.parsing.ReaderContext.error(ReaderContext.java:80)
        at org.springframework.beans.factory.xml.BeanDefinitionParserDelegate.error(BeanDefinitionParserDelegate.java:318)
        at org.springframework.beans.factory.xml.BeanDefinitionParserDelegate.parseCustomElement(BeanDefinitionParserDelegate.java:1435)
        at org.springframework.beans.factory.xml.BeanDefinitionParserDelegate.parseCustomElement(BeanDefinitionParserDelegate.java:1428)
        at org.springframework.beans.factory.xml.DefaultBeanDefinitionDocumentReader.parseBeanDefinitions(DefaultBeanDefinitionDocumentReader.java:195)
        at org.springframework.beans.factory.xml.DefaultBeanDefinitionDocumentReader.doRegisterBeanDefinitions(DefaultBeanDefinitionDocumentReader.java:139)
        at org.springframework.beans.factory.xml.DefaultBeanDefinitionDocumentReader.registerBeanDefinitions(DefaultBeanDefinitionDocumentReader.java:108)
        at org.springframework.beans.factory.xml.XmlBeanDefinitionReader.registerBeanDefinitions(XmlBeanDefinitionReader.java:493)
        at org.springframework.beans.factory.xml.XmlBeanDefinitionReader.doLoadBeanDefinitions(XmlBeanDefinitionReader.java:390)
        at org.springframework.beans.factory.xml.XmlBeanDefinitionReader.loadBeanDefinitions(XmlBeanDefinitionReader.java:334)
        at org.springframework.beans.factory.xml.XmlBeanDefinitionReader.loadBeanDefinitions(XmlBeanDefinitionReader.java:302)
        at org.springframework.beans.factory.support.AbstractBeanDefinitionReader.loadBeanDefinitions(AbstractBeanDefinitionReader.java:174)
        at org.springframework.beans.factory.support.AbstractBeanDefinitionReader.loadBeanDefinitions(AbstractBeanDefinitionReader.java:209)
        at org.springframework.beans.factory.support.AbstractBeanDefinitionReader.loadBeanDefinitions(AbstractBeanDefinitionReader.java:180)
        at org.springframework.beans.factory.support.AbstractBeanDefinitionReader.loadBeanDefinitions(AbstractBeanDefinitionReader.java:243)
        at org.springframework.context.support.AbstractXmlApplicationContext.loadBeanDefinitions(AbstractXmlApplicationContext.java:127)
        at org.springframework.context.support.AbstractXmlApplicationContext.loadBeanDefinitions(AbstractXmlApplicationContext.java:93)
        at org.springframework.context.support.AbstractRefreshableApplicationContext.refreshBeanFactory(AbstractRefreshableApplicationContext.java:130)
        at org.springframework.context.support.AbstractApplicationContext.obtainFreshBeanFactory(AbstractApplicationContext.java:537)
        at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:451)
        at org.springframework.context.support.ClassPathXmlApplicationContext.<init>(ClassPathXmlApplicationContext.java:139)
        at org.springframework.context.support.ClassPathXmlApplicationContext.<init>(ClassPathXmlApplicationContext.java:83)

最后在http://robert-reiz.com/2011/11/14/832/页面找到了答案:

这是因为项目中包含了很多的Spring Jar,不同的Spring Jar会有相同的文件名称,他们相互冲突,为了避免元数据文件的相互覆盖,应该合并他们,如果使用maven shade plugin的话,可以在POM中加上下面的信息来解决:

<transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
  <resource>META-INF/spring.handlers</resource>
</transformer>
<transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
  <resource>META-INF/spring.schemas</resource>
</transformer>

对该问题,官方的解释位于:http://maven.apache.org/plugins/maven-shade-plugin/examples/resource-transformers.html

在“Merging Content of Specific Files with AppendingTransformer and XmlAppendingTransformer”小节,解释为:

很多的JAR包含了相同的文件名称,为了避免相互覆盖,可以将他们合并到单个文件中。一个很好的例子,就是spring-context包和plexus-spring包,他俩都有META-INF/spring.handlers文件,这个文件被Spring用来处理XML Schema namespaces,通过如下所示的merge方法,可以解决这个问题

问题终于找到了,原来在这里,看下官方贴出来的POM:

<project>
  ...
  <build>
    <plugins>
      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-shade-plugin</artifactId>
        <version>2.4.2</version>
        <executions>
          <execution>
            <phase>package</phase>
            <goals>
              <goal>shade</goal>
            </goals>
            <configuration>
              <transformers>
                <transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
                  <resource>META-INF/spring.handlers</resource>
                </transformer>
                <transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
                  <resource>META-INF/spring.schemas</resource>
                </transformer>
              </transformers>
            </configuration>
          </execution>
        </executions>
      </plugin>
    </plugins>
  </build>
  ...
</project>

最终的解决方案

最终的POM文件如下所示:

<build>
	<finalName>mdm-v3-backtasks</finalName>
	<plugins>
		<plugin>
			<groupId>org.apache.maven.plugins</groupId>
			<artifactId>maven-jar-plugin</artifactId>
		</plugin>
		<plugin>
			<groupId>org.apache.maven.plugins</groupId>
			<artifactId>maven-shade-plugin</artifactId>
			<version>2.4.2</version>
			<executions>
				<execution>
					<phase>package</phase>
					<goals>
						<goal>shade</goal>
					</goals>
					<configuration>
						<transformers>
							<transformer
								implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
								<resource>META-INF/spring.handlers</resource>
							</transformer>
							<transformer
								implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
								<resource>META-INF/spring.schemas</resource>
							</transformer>
						</transformers>
						<filters>
							<filter>
								<artifact>*:*</artifact>
								<excludes>
									<exclude>META-INF/*.SF</exclude>
									<exclude>META-INF/*.DSA</exclude>
									<exclude>META-INF/*.RSA</exclude>
								</excludes>
							</filter>
						</filters>
					</configuration>
				</execution>
			</executions>
		</plugin>
		<plugin>
			<groupId>org.apache.maven.plugins</groupId>
			<artifactId>maven-resources-plugin</artifactId>
			<version>2.6</version>
			<configuration>
				<encoding>UTF-8</encoding>
			</configuration>
		</plugin>
	</plugins>
</build>

使用本POM配置,打包、运行均成功没有出错。

参考链接:

官方:http://maven.apache.org/plugins/maven-shade-plugin/examples/resource-transformers.html

安全问题:http://zhentao-li.blogspot.com/2012/06/maven-shade-plugin-invalid-signature.html

Context问题:http://robert-reiz.com/2011/11/14/832/

 

本文地址:http://crazyant.net/1886.html,转载请注明来源