Baimoon's Note


  • 首页

  • 分类

  • 归档

  • 标签

redis_lua

发表于 2017-03-08

Shell Study

发表于 2017-02-23   |   分类于 unix

本文记录一些自己在使用shell进行批量操作、任务调度等工作时用到的一些shell的基础知识,在此记录以备翻阅和查找。

阅读全文 »

Java execute command

发表于 2017-02-08   |   分类于 Java

在Java的使用过程中,难免需要去执行linux命令(执行shell也是linux命令),那么应该如何做呢?本文将进行一些演示。

所依赖的相关类

要在Java中执行linux命令有两种方式,依赖于三个类。我们先介绍这三个类,然后在使用这三个类,组合两种方案来进行说明。

java.lang.Process

概括

ProcessBuilder.start()和Runtime.exec方法创建一个本地进程,并返回一个Process子类的实例,该实例用来控制进程并获得相关信息。Process类提供了执行
当Process对象没有更多的引用时,不是删除子进程,而是继续异步执行子进程。

阅读全文 »

Python Datetime

发表于 2017-02-04   |   分类于 python

在使用python写调度任务的时候,离不开的必然有日期和时间的处理;最常见的有根据字符串生成时间、将时间生成指定格式的字符串、日期时间的计算(加减)等等。在python中对日期时间进行操作的包为datetime。下面就对该包的一些常用操作和对应的参数进行介绍。

阅读全文 »

Python Subprocess

发表于 2017-02-04   |   分类于 python

在平时python的使用过程中,难免会遇到调用服务器命令的时候。直接调用普通的命令基本上都没有什么问题,令人比较麻烦的是带有控制台的命令,例如python、beeline、spark-shell。虽然向python、spark都有相关的脚本文件或者jar来避免直接使用控制台命令的调用,然后有些时候还是不免会用到控制台的方式,那么对于带有控制台的命令行应该如何实现呢?本文将使用subprocess,并以beeline为背景来实现使用python执行带有控制台的命令行命令。
首先看看参考代码,代码是以执行Hive的beeline命令行为例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
class beeline:
def __init__(self):
print "# 1 建立连接"
self.p = subprocess.Popen(['apache-hive-0.14.0-bin/bin/beeline'], stdin=subprocess.PIPE,
stdout=subprocess.PIPE)
print >> self.p.stdin, '!connect jdbc:hive2://hdfs001:2181,hdfs002:2181,hdfs003:2181,hdfs004:2181,hdfs005:2181/;serviceDiscoveryMode=zookeeper userName password\n'
self.p.stdin.flush()
def submit(self, hql):
print "# 2 输入命令"
print >> self.p.stdin, hql
self.p.stdin.flush()
print "# 3 等待关闭"
print >> self.p.stdin, "!q"
self.p.wait()
def hadoop_get(self, from_, to_):
print "# 4 下载数据"
(status, output) = commands.getstatusoutput(" ".join(("hadoop-2.6.0/bin/hadoop fs -text", from_+'*', '>', to_)))
print status, output
def queryDataByDate(start_date, end_date, local_path):
sql = """
create table database.table_%s_%s
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '-@-'
NULL DEFINED AS '...'
STORED AS TEXTFILE
AS
SELECT * FROM DB.TABLE_NAME;
"""
b = beeline()
s = sql % (start_date, end_date, start_date, end_date)
b.submit((sql % (start_date, end_date, start_date, end_date)))
fileName = 'feed_%s_%s' % (start_date, end_date)
b.hadoop_get(("HDFS_PATH/%s/" % (fileName)), ("LOCAL_PATH/%s" % fileName))
if __name__ == '__main__':
if len(sys.argv) < 2:
print "请输入要获取feed的开始日期和结束日志,如:20160105"
exit(0)
print "正在执行%s文件,来查询%s-%s之间的数据:" % (sys.argv[0], sys.argv[1], sys.argv[2])
queryDataByDate(sys.argv[1], sys.argv[2], "/data/")

该代码块的主要流程是,在初始化beeline对象时调用beeline命令,并进行连接(init方法中实现了全部的操作);然后是提交需要beeline执行的sql(submit方法中实现);最后是将sql执行的结果从HDFS中取到本地(hadoop_get方法中实现)。queryDataByData方法就是对beeline类中各个方法的一个集成调用。

Double-Array trie

发表于 2017-01-13   |   分类于 数据结构

本文主要用来学习Double-Array trie的相关知识。

源码的github地址

Double-Array trie是Trie结构的压缩形式,仅用两个数组来表示Trie,这个结构有效的结合数字搜索树(Digital Search Tree)检索时间高效的特点和链式表示的Trie空间结构紧凑的特点。双数组Trie的本质是一个确定有限状态自动机(DFA),每个节点代表自动机的一个状态,根据不同的变量,进行状态转移,当到达结束状态或无法转移时,完成一次查询操作。在双数组所有键中,包含的字符之间的联系都是通过简单的数学加法运算表示的,不仅提高了检索速度,而且省去了链式结构中使用的大量指针,节省了存储空间。

在了解Double-Array trie之前,我们先了解一下“确定有限状态自动机”。在数学理论中,确定有限状态自动机或确定有限自动机(deterministic finite automation, DFA)是一个能实现状态转移的自动机。对于一个给定的属于该自动机的状态和一个属于该自动机字母表的字符,它能够根据实现给定的函数转移到下一个状态。简单的说,就是当前状态根据一个公式和状态的确定值,能够到达另外一个状态,而且要到达的状态是确定的。如图:
确定有限自动状态机
图中的每个字代表一个状态,并且每个状态都有一个固定的变量。

在了解了确定优先状态自动机之后,我们来了解一下Double-Array trie。Double-Array trie的核心是使用两个整型数组base和check来分别存储状态以及前驱状态。说的简单一些,base用来存储状态,check用来验证。在状态的转移过程中,有如下公式:

1
2
check[t]=s
base[s]+c=t //其中t和s是数组下标

上面的公式表示 base[s]的值 + 状态的变量 = t下标,check[t]的值 = s下标。

举例来说明:

在学习Douoble-Array trie和看DoubleArrayTrie源码的时候,参考了以下文章,在此表示感谢:
双数组Trie树(DoubleArrayTrie)Java实现
Double Array Trie

JPinYin

发表于 2016-12-30   |   分类于 NLP

本文主要介绍JPinYin的使用和配置,github的地址。

简单介绍

Jpinyin是一个开源的用于将汉字转换为拼音的java库。

主要特性

1、准确、完善的字库:Unicode编码从4E00-9FA5范围及3007(〇)的20903个汉字中,除了46个异体字(不存在标准拼音)Jpinyin都能转换。
2、拼音转换速度快:经测试,转换Unicode编码范围的20902个汉字,Jpinyin耗时约为100毫秒。
3、支持多种拼音格式:Jpinyin支持多种拼音输出格式:带声调、不带声调、数字表示声调以及拼音首字母格式输出。
4、常见多音字识别:Jpinyin支持常见多音字的识别,其中包括词组、成语、地名等;
5、简繁体中文互转。
6、支持用户自定义字典。

Maven依赖

1
2
3
4
5
<dependency>
<groupId>com.github.stuxuhai</groupId>
<artifactId>jpinyin</artifactId>
<version>1.1.8</version>
</dependency>

用法

1
2
3
4
5
6
String str = "你好世界";
PinyinHelper.convertToPinyinString(str, ",", PinyinFormat.WITH_TONE_MARK); // nǐ,hǎo,shì,jiè
PinyinHelper.convertToPinyinString(str, ",", PinyinFormat.WITH_TONE_NUMBER); // ni3,hao3,shi4,jie4
PinyinHelper.convertToPinyinString(str, ",", PinyinFormat.WITHOUT_TONE); // ni,hao,shi,jie
PinyinHelper.getShortPinyin(str); // nhsj
PinyinHelper.addPinyinDict("user.dict"); // 添加用户自定义字典
阅读全文 »

scikit image - a crash course on NumPy for images

发表于 2016-12-19   |   分类于 图像处理

本文用来学习scikit-image的官方文档的a crash course on NumPy for images,原链接

A crash course on NumPy for images

scikit-image是以NumPy数组的方式来操作图像。因此图像很大一部分的操作将是使用NumPy:

1
2
3
4
>>> from skimage import data
>>> camera = data.camera()
>>> type(camera)
<type 'numpy.ndarray'>

检索图像的几何以及像素数:

1
2
3
4
>>> camera.shape
(512, 512)
>>> camera.size
262144

检索关于灰度值的统计信息:

1
2
3
4
>>> camera.min(), camera.max()
(0, 255)
>>> camera.mean()
118.31400299072266

代表图片的NumPy数组可以是浮点数类型的不同整数。查看Image data type and what the mean获取关于这些类型的更多信息,以及scikit-image如何处理它们。

阅读全文 »

scikit image - getting started

发表于 2016-12-16   |   分类于 图像处理

本文用来学习scikit-image的官方文档的入门手册,原链接

Getting started

scikit-image是一个图像处理的Python包,它使用numpy数组来工作。这个包作为skimage被引入:

1
>>> import skimage

skimage的大多数函数将在子模块中找到:

1
2
>>> from skimage import data
>>> camera = data.camera()

一个包含子模块和函数的web页面可以在API reference中找到。
在scikit-image中,图片相当于一个NumPy数组,例如,一个2-D的数组表示了一个灰度的2-D图片

1
2
3
4
5
>>> type(camera)
<type 'numpy.ndarray'>
>>> # An image with 512 rows and 512 columns
>>> camera.shape
(512, 512)

skimage.data模块提供了一组返回示例图片的函数,这些图片可以用来快速学习scikit-image的函数:

1
2
3
4
5
>>> coins = data.coins()
>>> from skimage import filters
>>> threshold_value = filters.threshold_otsu(coins)
>>> threshold_value
107

当然,还可以使用skimage.io.imread()从图片文件来加载自己的图片信息,加载后的图片也是作为一个NumPy数组:

1
2
3
4
>>> import os
>>> filename = os.path.join(skimage.data_dir, 'moon.png')
>>> from skimage import io
>>> moon = io.imread(filename)

Log4j Configuration

发表于 2016-11-10   |   分类于 Logging

本文用来学习关于Log4j的配置(通过配置文件的方式),原文档连接

Configuration

将日志请求插入到application代码中需要相当多的计划和努力。观察表明,大约百分之四的代码用于记录日志。因此,即使是一般大小的application也将会有数以千计的日志片段嵌套在代码中。考虑到这个数量,如何不需要手动修改就能管理这些日志片段就显得十分重要。
配置Log4j 2版本,能够通过下面四种方法中的任意一种来完成:
1、通过一个以XML、JSON、YAML或properties格式的配置文件。
2、编程方式,通过创建一个ConfigurationFactory和Configuration实现。
3、编程方式,通过调用在Configuration接口中的APIs来添加组件到默认配置。
4、编程方式,通过调用内部Logger类的方法。
本文主要关注通过一个配置文件来配置Log4j。对于通过编程方式来配置Log4j,可以在Extending Log4j 2和Programmatic Log4j Configuration。
注意,不同于Log4j 1.x,Log4j 2的公共API没有公开关于添加、修改和移除appenders和filter的方法,或者以任何方式来操作配置。

阅读全文 »

Log4j Architecture

发表于 2016-11-10   |   分类于 Logging

本文主要是针对Log4j的2.x版本的文档的,链接

Architecture

Main Components

Log4j使用的类下面图表中展示。
Log4j Classes
使用Log4j的applications讲需要使用一个特定的名称向LogManager请求一个Logger。LogManager会定位到适当的LoggerContext,然后从LoggerContext中获取Logger。如果Logger必须被创建,它将与包含了如下内容的LoggerConfig进行关联:1)与Logger相同名称的LoggerConfig;b)父级package的名称的LoggerConfig;3)根级LoggerConfig。LoggerConfig对象根据配置中Logger的声明进行创建。LoggerConfig与日志事件的实际输出源联系在一起。

阅读全文 »

Apache Kafka

发表于 2016-09-28   |   分类于 Kafka

本文是Kafka 0.10.0文档的翻译,主要用于自学。

1 Getting Started

1.1 Introduction

Kafka是一个分布式的、分区的、备份的提交日志服务。它提供了一个消息传输系统的功能,但是使用了一个独特的设计。
那意味着什么?
首先我们浏览一下基本的消息队列术语:

  • Kafka以一种类型持续messages的提供称为topics。
  • 我们称那些publish message到一个Kafka topic的进程为producers。
  • 我们称那些subscribe到topics并处理被publish的message的进程为consumers。
  • kafka作为一个集群而运行,集群由一个或多个server组成,每个server成为一个broker。

因此,整体来看,producers通过网络发送messages到Kafka集群,同样Kafka又为consumers服务,像这样:
producer_consumer
clients和servers之间的通信是通过一个简单的、高性能的、跨语言的TCP协议完成的。我们为Kafka提供了一个Java client,但是clients在很多语言中都可用。

Topics and Logs

首先我们学习由Kafka提供的高级别的抽象 - topic。
一个topic是一种或一个提供的名称,用来publish message。对于每个topic,Kafka集群维持着一个分区日志,看起来像这样:
Anatomy of a Topic
每个partition是一个顺序的、不可变的连续添加的消息队列。partitions中的每个message分配一个序列id号,称为offset,用来唯一标识partition中的每条message。
Kafka集群保存所有publish过来的message-不管它们是否被消费,保存时长可配置。例如,如果日志保存设置为两天,那么一个message在publish后两天内是可以被消费的,但是两天之后,它将被删除以释放空间。kafka的性能对于不同数据大小是恒定有效的,因此很多的数据不是个问题。
实际上每个consumer仅有被保存的元数据是consumer在日志中的位置,称为offset。这个offset由consumer控制:通常一个consumer按照它读取的message,线性的推进它的offset,但是这个位置由consumer控制并且consumer能够以任意顺序消费message。例如一个consumer可以重置到一个原来的offset来重新处理。
这个特征的组合意味着Kafka consumer是非常廉价的 - 它们能够自由的来去,而不会影响集群或其他consumer。例如,你可以使用我们的命令行工具来tail任何topic的内容,而不需要任何已经存在的consumers改变它所消费的内容。
日志中的partitions有几个用途。首先,它们允许日志扩展到单个server所能容纳的日志大小之外。一个partition必须位于它所属的server上,但是一个topic可能有很多的partitions,因此它能够持有任意数量的数据。其次,它们的行为类似一个并行单元 - 汇聚更多于一点。

Distribution

日志的partitions在Kafka集群中跨server分布,每个server为一个共享的partition处理数据和请求。每个partition为了容灾,跨server保存多个备份,备份的数量可以配置。
每个partition有一个server扮演”leader”的角色,并有零个或多个servers扮演”followers”的角色。leader为partition处理所有的读和写的请求,而follower只是被动的复制leader。如果leader失效了,follower中的一个将自动成为新的leader。每个server为它自己一些的partitions扮演一个leader角色,为其他的partition扮演一个follower的角色,因此每个server在集群中的负载是很均衡的。

Producers

Producers将数据publish到它们选择的topics中。Producer负责哪些message被分配到topic的哪些partition中。这可以通过简单的轮转来完成以平衡负载,或者可以一致性的partition定义函数来完成(例如基于message的某些key)。在分区上用的更多的是第二种。

Consumers

传统的消息传输有两种模式:queuing和publish-subscribe。在队列方式中,一个consumers池从一个server中读取,每条message只会到达某个consumer;在发布-订阅方式中,message广播给所有的consumers。Kafka提供了单个consumer抽象,它概括了上面两种方式 - consumer group。
consumers使用一个consumer群名称来标识它们自己,每个publish到一个topic的message被传递到每个订阅了topic的consumer组的一个consumer实例。consumer实例能够在单独的进程或单独的机器上。
如果所有的consumer实例拥有相同的consumer组,那么工作方式与一个传统的跨consumers负载均衡的队列类似。
如果所有的consumer实例都有不同的consumer组,那么工作方式与发布-订阅类似,所有的message会广播给所有的consumer。
更常见的,尽管我们发现那些topics有少数量的consumer组,然而每个都是一个逻辑订阅者。每个组由多个consumer实例组成,这样具有扩展性和容灾性。这也是publish-subscrib的定义,只不过subscriber是一个consumer群,而不是单个进程。
相对于传统消息传输系统,Kafka有更强的顺序保证。
consumer-groups
一个传统队列在server上按顺序保存messages,如果多个consumer从队列中消费数据,那么server以message存储的顺序拿出message。然而,虽然server按照顺序拿出message,但是message以异步方式投递给consumers,因此它们可能在不同的consumer上以不同的顺序到达。这意味着在并行消费的情况中消息的顺序丢失了。消息传输系统通过一个”exclusive consumer”的概念来解决这个问题,它只允许一个进程从队列中消费,但是这意味着没有并行处理。
Kafka做的更好一些。通过一个并行概念-partition-在topics中,Kafka能够在一个consumer进程池上同时提供顺序保证和负载均衡。这是通过将topic中的partitions分配给consumer组中的consumers来完成的,因此每个partition有组中确切的一个consumer来消费。通过这样,我们确保consumer值读取那一个partition,并以顺序消费数据。因为有很多partitions在很多consumer实例上是均衡负载的。注意,一个consumer组中的consumer实例不能多余partitions的数量。
Kafka只是在一个partition中提供了一个整体的顺序,而不是在一个topic的不同partition之间。对于大多applications,每个分区的排序联合根据key划分数据的能力是充分的。如果你要求在message上有整体的顺序,这可以通过使用一个topic只有一个partition来完成,这也意味着每个consuemr组只有一个consumer进程。

Guarantees

在高层次上,Kafka给了如下的保证:

  • 由一个producer发送到一个特定topic partition的Messages将会以它们被发送的顺序添加。那就是,如果一个message M1与发送message M2的producer是一个,并且M1先被发送,那么M1将有一个比M2小的offset,并且要比M2更早的添加到日志中。
  • 一个consumer看到messages的顺序是messages存储的顺序。
  • 对于使用了复制因子为N的topic,在不丢失任何提交到log的messages丢失,我们允许最多N-1个server故障。

这些保证的更多细节在文档的design章节中给出。

阅读全文 »

Flume Install

发表于 2016-09-21   |   分类于 Flume

本文主要介绍自己在生产中使用flume的实际配置,以便以后查询。如果能够为他人提供参考,荣幸之至。
在Flume中分为三个部分source、channel和sink。source主要用于接收数据,sink用于写出数据,channel作为source和sink的连接、保存和转发使用。其中非常好用的是,channel可以使用Kafka,从而使得Flume具有了超强的存储能力,如果在加上可靠的source和sink,完全可以保证数据零丢失。
本文使用的例子中采用的是内存channel,这种channel的缺点是存储长度有限,重启数据丢失,有点就是速度快,低延迟。至于source,使用的是avro。最后是sink,因为我得目的是将数据写入到HDFS中,以后Hadoop集群或Spark集群进行计算,因此使用的hdfs类型的sink。

阅读全文 »

Ganglia Install And Config

发表于 2016-09-18   |   分类于 ganglia

本文是ganglia的安装和配置的笔记

Ganglia的安装

首先,ganglia由gmond、gmetad和gweb三部分组成。

gmond

gmond(Ganglia Monitoring Daemon)是一种轻量级服务,安装在每台需要手机指标数据的节点主机上。它通过侦听/通告协议与集群内其他节点共享数据。
gmond的安装很简单,其所依赖的库,libconfuse、pkgconfig、PCRE和APR等在大多数现行的linux上都有安装。

1
sudo yum install ganglia-gmond

gmetad

gmetad (Ganglia Meta Daemon)是一种从其他gmetad或gmond源收集指标数据,并将数据以RRD格式存储到磁盘的服务。gmetad为从主机组收集的特定指标信息提供了简单的查询机制,并支持分级授权,使得创建联合检测域成为可能。
gmetad除了需要安装gmond所需的依赖之外,还需要RDDtool库。它用来存储和显示从其他gmetad和gmond源收集的时间序列数据。

1
sudo yum install ganglia-gmetad

gweb

完整的Ganglia不能缺少网络接口:gweb(Ganglia Web)。gweb是一种利用浏览器显示gmetad所存储数据的PHP前端。
Ganglia 3.4的Web接口是一个独立的发布包,其源代码也是独立的。gweb 3.4支持gmond/gmetad 3.4.x及以上版本;gweb未来版本可能需要与gmond/gmetad未来版本相匹配。建议安装或更新gweb的时候查看安装文档,以获取更多信息。
安装gweb需要如下需求:

  • Apache Web Server
  • PHP 5.2级更新版本
  • PHP JSON扩展的安装和启用

首先安装Apache和PHP

1
yum install httpd php

用户还需要启用PHP的JSON扩展,通过检查/etc/php.d/json.ini文件来检查JSON的扩展状态,如果已经启用扩展,文件中应该包含下面的语句:

1
extension=json.ini

下载最新的gweb(https://sourceforge.net/projects/ganglia/files/gweb/),然后编译Makefile来安装gweb2:

1
2
tar -xvzf ganglia-web-major.minor.release.tar.gz
cd ganglia-web-major.minor.release

阅读全文 »

Tuning Spark

发表于 2016-09-10   |   分类于 spark

本文是Tuning Spark文档的翻译,原文档请参考,本文主要用于个人学习。

Tuning Spark

因为大多数Spark计算是内存中的计算,因此集群中的任何资源都能够成为Spark程序的瓶颈:CPU、网络带宽或内存。通常,如果数据装载到内存中,瓶颈可能是网络带宽,但是有些时候,你还需要做一些调整,例如以序列化格式存储RDDs,以降低内存的使用。本指南覆盖了两个主题:数据序列化和内存调整,其中数据序列化对好的网络性能是至关重要的,并且还可以降低内存的使用。我们还会概述其他一些小的主题。

Data Serialization

序列化在任何分布式application的执行中扮演了很重要的角色。那些序列化缓慢的对象或消费很大数量byte的格式将极大的减慢计算。通常,这是优化一个Spark application首先要调整的东西。Spark的目标是在方便(允许你在你的操作中使用任何Java类型)和性能之间达到一个平衡。它提供了两种序列化库:

  • Java serialization: 默认,Spark使用Java的ObjectOutputStream框架进行序列化对象操作,能够和任何你实现了java.io.Serializable接口的类型一起工作。通过继承java.io.Externalizable,你能够更加近的控制你的序列化的执行。Java序列化是灵活的,但是通常是慢的,这导致对于很多类会有很大的序列化格式。
  • Kryo serialization: Spark能够使用Kryo库(版本2)来更快的序列化对象。相对于Java序列化Kryo显然是更快且更加简单的(通常是10倍还多),但是不支持所有的可序列化类型,并且需要你在程序中将你要使用的类进行注册以便获取更好的执行。

通过使用SparkConf初始化你的job,并调用conf.set(“spark.serializer”, “org.apache.spark.serializer.KryoSerializer”)来转换为使用Kryo。这个设置不止为了wroker节点混洗数据配置序列化生成器,当序列化RDDs到磁盘时也有用。Kryo不作为默认序列化生成器的唯一原因是需要自定义注册,但是我们推荐在任何网络集中型application中使用它。
Spark自动的很多常用的核心Scala类包含Kryo序列化生成器,涵盖在Twitter chill库的AllScalaRegistrar中。
要使用Kryo注册你自己的自定义类,使用registerKryoClasses方法。

1
2
3
val conf = new SparkConf().setMaster(...).setAppName(...)
conf.registerKryoClasses(Array(classOf[MyClass1], classOf[MyClass2]))
val sc = new SparkContext(conf)

Kryo documntation描述了更加高级的注册选项,如添加自定义序列化编码器。
如果你的对象很大,你可能需要增加spark.kryoserializer.buffer配置的值。这个值需要足够大以便保存你要序列化的最大对象。
最后,如果你没有注册你的自定义,Kryo将仍然能够工作,但是它将存储每个对象的全类名,这样损耗很大。

Memory Tuning

在调整内存的用法中,有三种值得考虑:被你的对象使用的内存的总量(你可能想要将整个数据集装配到内存中)、访问这些对象的开销和垃圾回收的开销(如果你在这些对象)。
默认,Java对象是快速访问的,但是在它们字段的内部很容消费掉比原始数据多2-5倍的空间 。这是因为有如下原因:

  • 每个不同的Java对象有一个”object header”,它大概是是16个字节,并包含信息,诸如一个指向它的class的指针。对于一个带有非常少数据的对象(假设一个Int字段),这可能要比数据大很多。
  • Java Strings在原生的string数据上有一个大概40字节的开销(因为它们被存储到一个Chars数组中,并且保存了额外的数据,如长度),并且每个字符以两个字节存储,因为String内部使用的UTF-16进行编码。因此一个包含10个字符的字符串很容易消耗掉60个字节。
  • 常见的集合类,如HashMap和LinkedList,使用linked数据结构,每个数据项是一个包装对象(如Map.Entry)。这种对象不只有header,而且还有指针(通常是8个字节)指向列表中的下一个对象。
  • 原始类型常常以包装对象来存储,诸如java.lang.Integer。
阅读全文 »

Spark Streaming + Kafka Integration Guide

发表于 2016-09-08   |   分类于 spark

本文是Spark Streaming + Kafka Integration Guide文档的翻译,原文请参考。另外,本文主要用于个人学习使用。

Spark Streaming + Kafka Integration Guide

Apache Kafka是一个发布-订阅的消息队列,作为一个分布式的、分片的、副本提交的日志服务。这里解释如何配置Spark Streaming来从Kafka接收数据。有两种方法来达到这个目的 - 老的方法是使用Receiver和Kafka的高级API,还有一个新的实验性解决方法(在Spark 1.3版本中引入),不需要使用Receiver。它们有不同的编程模式、执行特征和语义保证,因此请仔细阅读。

Approach 1: Receiver-based Approach

这个方法使用一个receiver来接收数据。这个Receiver使用Kafka高级别consumer API来实现。和所有receivers一样,通过一个Receiver从Kafka接收到的数据被存储到Spark executors中,然后Spark Streaming启动jobs来处理数据。
然而,根据默认配置,这个解决方法会因为故障而丢失数据(查看receiver reliabillity。要确保零数据丢失,你需要额外的在Spark Streaming中启用Write Ahead Logs(从Spark 1.2版本中引入)。这将同步的将从Kafka接收到的数据到以写ahead日志的方式波存到分布式文件系统中(如HDFS),因此所有的数据能够从故障中恢复。查看Streaming programming guide中Deploying section获取关于Write Ahead Logs的详细信息)。
接下来,我们讨论如何在你的streaming application中使用这个方法。

1、Linking: 对于使用SBT或Maven项目描述的Scala或Java application,使用如下的坐标链接你的streaming application(查看programming guide中的[Linking section]获取更多信息)。

1
2
3
groupId = org.apache.spark
artifactId = spark-streaming-kafka-0-8_2.11
version = 2.0.0

对于Python application,你需要在部署你的application时添加上面的库以及它的依赖。查看Deploying分项的内容。
2、Programming: 在streaming application代码中,导入KafkaUtils并创建一个输入DStream,如下:

1
2
3
4
import org.apache.spark.streaming.kafka._
val kafkaStream = KafkaUtils.createStream(streamingContext,
[ZK quorum], [consumer group id], [per-topic number of Kafka partitions to consume])

你还可以指定key和value的类以及它们使用createStream的变体的相关解码类。查看API docs和example。

需要注意的几点:

  • Kafka中topic的partitions与Spark Streaming中RDDs生成的partitions没有关联。因此在KafkaUtils.createStream()中增加特定于topic的partition的数量只是增加使用的线程的数量,使用这些线程在单个receiver中对topic进行消费。它不会增加Spark数据的并发处理。参考主要文档获取它的更多信息。
  • 多个Kafka输入DStream能够使用不同的group和topics来创建,以便使用多个receiver来并行接收数据。
  • 如果你使用一个可靠的文件系统(像HDFS)启用了Write Ahead logs,接收到的数据已经被复制到日志中。因此,对于输入流存储的存储级别为StorageLevel.MEMORY_AND_DISK_SER(那就是KafkaUtils.createStream(…, StorageLevel.MEMORY_AND_DISK_SER))。

3、Deploying: 对于任何Spark application,spark-submit被使用来启动你的application。然而,对于Scala/Java application和Python application之间有轻微的不同。
对于Scala和Java application,如果你使用SBT或Maven作为项目管理,那么打包spark-streaming-kafka-0-8_2.11和它的依赖到application的JAR中。确保spark-core_2.11和spark-streaming_2.11作为依赖进行标记,因为他们已经安装到Spark中了。然后,使用spark-submit来启动你的application(查看主要编程指南中的Deploying section)。
对于Python application,它缺少SBT/Maven项目管理,spark-streaming-kafka-0-8_2.11和它的依赖可以使用–packages直接添加到spark-submit(查看Application Submission Guide)。这样:

1
./bin/spark-submit --packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.0.0 ...

或者,你能够从Maven repository下载Maven坐标的JAR并使用–jars将其添加到spark-submit。

Approach 2: Direct Approach (No Receivers)

这个新的五receiver的直接解决方法已经在Spark 1.3版本中引入,来确保健壮的端对端的保证。代替receivers来接收数据,这个方法周期性的查询Kafka来获取每个topic + partition中最后的offset,然后相应的定义每个batch中处理offset的范围。当处理数据的job启动时,使用Kafka的简单API从Kafka中读取定义的offset范围(类似从文件系统中读取文件)。注意这是一个在Spark 1.3中引入的实验性特征,用于Scala和Java API,在Spark 1.4中才有了Python的API。
这个解决方案在基于receiver的解决方案上有如下优势:

  • Simplified Parallelism: 不需要创建多个输入Kafka streams以及联合他们。使用directStream,Spark Streaming将会创建和要消费的Kafka partitions数量相同的RDD partitions,这样将病习惯你的从Kafka读取数据。因此在Kafka和RDD partitions之间有一个一对一的映射,这样更加容易理解和调整。
  • Efficiency: 在第一个解决方案中要实现零数据丢失需要将数据存储到Write Ahead Log中,这种方式是进一步的复制数据。这实际上是效率低的,因为数据实际上复制了两次-一次被Kafka,另一次被Write Ahead Log。第二种解决方案消除了这个问题,因为没有了receiver,因此不需要Write Ahead Log。只要你有足够的Kafka保留时间,message能够从Kafka中恢复。
  • Exactly-once semantics: 第一种解决方案使用Kafka的高级API将消费掉的offset存储到Zookeeper中。这是从Kafka消费数据的传统方式。而这种解决方案(和write ahead log混合)能够保证零数据丢失(例如至少一次的语义),在一些故障下,有很小的可能性是一些数据会消费两次。这种存在是因为由Spark Streaming可靠的接收的数据和由Zookeeper跟踪的offsets之间的矛盾造成的。因此,在第二解决方案中,我们使用了简单的Kafka API,简单的API没有使用Zookeeper。通过Spark Streaming中它的checkpoints来跟踪offset。这消除了Spark Streaming和Zookeeper/Kafka之间的差异,因此尽管有故障,但是被Spark Streaming接收到的记录实际上只有一次。为了达到输出你的结果只有一次的语义,你的输出操作将数据保存到外部存储中必须是幂等或原子事务的,这样来保存结果和offset(查看主编程指南中Semantics of output operations获取更多信息)。

注意,这种解决方案中一个不利条件是不会在Zookeeper中更新offset,因此那些基于Zookeeper的监控工具将不会更新进度。然而,你能够在这种解决方案中访问每个batch中处理过的offsets并自己更新到Zookeeper(参考下面)。
接下来,我们讨论如何在你的application中使用这种解决方案。

  • Linking: 这种解决方案在Scala和Java application中支持。使用如下的坐标来连接你的SBT/Maven项目(查看主要编程指南中Linking section获取更多信息)。

    1
    2
    3
    groupId = org.apache.spark
    artifactId = spark-streaming-kafka-0-8_2.11
    version = 2.0.0
  • Programming: 在Streaming application代码中,引入KafkaUtils并如下创建一个输入DStream。

    1
    2
    3
    4
    5
    import org.apache.spark.streaming.kafka._
    val directKafkaStream = KafkaUtils.createDirectStream[
    [key class], [value class], [key decoder class], [value decoder class] ](
    streamingContext, [map of Kafka parameters], [set of topics to consume])

你还可以传递一个messageHandler到createDirectStream来访问MessageAndMetadata,MessageAndMetadata包含了关于当前message的元数据和要将它转换成的目标类型。查看API docs和examples。
在Kafka的参数中,你必须指定metadata.broker.list或bootstrap.servers。默认,它将从每个Kafka partition的最后的offset处开始消费。如果你在Kafka参数中设置auto.offset.reset为smallest,那么它将从最小的offset处开始消费。
使用KafkaUtils.createDirectStream的其他变量,你还能够从任意offset处开始消费。此外,如果你想要访问每个batch中已经消费的Kafka offset,你可以如下这么做。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// Hold a reference to the current offset ranges, so it can be used downstream
var offsetRanges = Array[OffsetRange]()
directKafkaStream.transform { rdd =>
offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
rdd
}.map {
...
}.foreachRDD { rdd =>
for (o <- offsetRanges) {
println(s"${o.topic} ${o.partition} ${o.fromOffset} ${o.untilOffset}")
}
...
}

如果你想要基于Zookeeper的Kafka监控工具来展示streaming application的进度,你还可以使用这个来更新Zookeeper。
注意到HasOffsetRanges的类型转换,将只有它在第一个方法在directKafkaStream上调用完成后才会成功,不会晚于一个方法链(不知道这句是否正确)。你能够使用transform()方法来代替foreachRDD()方法来作为你调用的第一个方法以便访问offsets,然后调用进一步的Spark方法。然而,需要注意的是在任何shuffle或repartition方法之后,RDD partition和Kafka partition之间的一对一映射将不再维持,例如reduceByKey()方法或window()方法。
另一个需要注意的是,因为这个解决方案没有使用receiver,标准的receiver-related(spark.streaming.receiver.格式的配置)将不会应用到由这种解决方案生成的输入DStreams上(但是会应用到其他输入DStream上)。相反,会是用spark.streaming.kafka.的配置。一个重要的东西是spark.streaming.kafka.maxRatePerPartition,它将限制该API的从每个Kafka partition读取数据的速度(每秒message的条数)。

  • Deploying: 这跟第一种解决方案中的相同。

Spark Streaming Programming Guide

发表于 2016-08-17   |   分类于 spark

本文是Spark Streaming手册的翻译文档,会随着自己的实现进行更新,官方文档请参考。

Overview

Spark Streaming是核心Spark API的一个延伸,它对实时数据流进行可扩展的、高吞吐量的、容灾的进行处理。数据可以从很多源(如Kafka、Flume、Kinesis或TCP socket)进行提取,然后被复杂的算法组合处理,这些复杂的算法可以使用高级别的函数,如map、reduce、join和window。最后,被处理过的数据可以推出到外部文件系统、数据库和实时图表中。实际上你可以在数据流上应用Spark的机器学习和图处理。
spark streaming architecture
在内部,它如下工作。Spark Streaming接收实时的输入数据流,并将数据划分到批次中,然后在批次中数据被Spark引擎处理并生成最终的结果流。
Spark Streaming data flow
Spark Streaming提供了一个高级别的抽象,叫做discretized stream或DStream,它代表了一个连续的数据流。DStream可以从来自数据源(如Kafka、Flume和Kinesis)的输入数据流创建,也可以通过在其他DStream上应用高级别的操作来创建,一个DStream以一个RDDs序列来表示。
本指南展示了如何开始使用DStreams来编写Spark Streaming程序。你可以使用Scala、Java或Python(从Spark1.2中引入)来编写Spark Streaming程序,这些语言的代码都会在本指南中提供。你会发现tabs在本指南中随处可见,是你可以在不同语言的代码片段之间任意选择。
注意:在Python中有少量的APIs是不同或不可用的。贯穿整个指南,你会发现Python API标签高亮了这些不同。

阅读全文 »

Configuration

发表于 2016-08-09   |   分类于 spark

本文是对Spark配置的翻译,主要用于本人学习使用,原文请参考

Spark提供了三个用于对系统配置的位置:

  • Spark properties控制大多数application参数,可以通过使用SparkConf对象设置或通过Java系统属性设置。
  • Environment variables可以设置每台机器的设置,如IP地址,通过每个节点上的conf/spark-env.sh脚本。
  • Logging可以通过log4j.properties来配置。

Spark Properties

Spark属性控制大多数application设置,并且对每个application进行独立配置。这些属性可以直接在SparkConf上设置,SparkConf会传递给你的SparkContext.SparkConf,来允许你控制一些常用属性(如master的URI和application的名称等),通过set()方法达到和key-value对一样。例如,我们可以使用两个线程来初始化一个application,如下:
注意我们使用local[2]运行,意味着两个线程-表示最低的并行,这样能够帮助我们发现那些只有在分布式context上运行才会出现的bug。

1
2
3
4
val conf = new SparkConf()
.setMaster("local[2]")
.setAppName("CountingSheep")
val sc = new SparkContext(conf)

注意在本地模式中我们可以使用多个线程,但是在像Spark Streaming中,我们实际上要求使用多个线程,来避免任何饥饿情况的发生。
指定时间属性时需要配置时间单位,下面的格式是可以接受的:

1
2
3
4
5
6
25ms (milliseconds)
5s (seconds)
10m or 10min (minutes)
3h (hours)
5d (days)
1y (years)

指定字节大小的属性应该配置一个大小单位,下面的格式是可以接受的:

1
2
3
4
5
6
1b (bytes)
1k or 1kb (kibibytes = 1024 bytes)
1m or 1mb (mebibytes = 1024 kibibytes)
1g or 1gb (gibibytes = 1024 mebibytes)
1t or 1tb (tebibytes = 1024 gibibytes)
1p or 1pb (pebibytes = 1024 tebibytes)

阅读全文 »

scala-sbt

发表于 2016-07-27   |   分类于 scala

本文介绍一些关于sbt的使用,关于sbt的安装,请参考sbt install

使用sbt进行编译

对于sbt来说,最简单的工程就是某个目录下只有一个scala文件。对于这种文件,在当前目录下运行sbt,进入sbt控制台后执行run即可。

vim HelloWorld.scala

1
2
3
4
5
object HelloWorld{
def main(args: Array[String]) {
println("Hello, SBT")
}
}

执行:

1
2
3
4
5
6
7
8
9
$sbt
>run
[info] Updating {file:/Users/renweiming/tmp/}tmp...
[info] Resolving org.fusesource.jansi#jansi;1.4 ...
[info] Done updating.
[info] Compiling 1 Scala source to /Users/renweiming/tmp/target/scala-2.10/classes...
[info] Running HelloWorld
Hello, SBT
[success] Total time: 4 s, completed 2016-7-27 23:19:35

阅读全文 »

Unix Install

发表于 2016-07-27   |   分类于 unix

本文记录了常用软件的安装,主要针对的是CentOS,其他系统会进行特殊标注

Python 2.7.8的安装

1
2
3
yum --enablerepo=ius install python27 -y
yum --enablerepo=ius install python27-devel -y
yum --enablerepo=ius install python27-pip -y

Sbt的安装

sbt的GitHub

1
2
3
wget https://dl.bintray.com/sbt/native-packages/sbt/0.13.12/sbt-0.13.12.zip
unzip sbt-0.13.12.zip
export PATH=$SBT_HOME/bin:$PATH
阅读全文 »
1234
baimoon

baimoon

Baimoon's blog

66 日志
24 分类
30 标签
GitHub
Links
  • xrange
© 2016-07 - 2020 baimoon
由 Hexo 强力驱动
主题 - NexT.Muse