您的当前位置:首页kafka -> structuredStreaming读取kafka日志 ->自定义输出到mysql

kafka -> structuredStreaming读取kafka日志 ->自定义输出到mysql

2023-11-13 来源:哗拓教育

/** * * * @autor gaowei * @Date 2020-04-13 17:59 */object kafkaToMysqlTest { class MysqlSink(url: String, user: String, pwd: String) extends ForeachWriter[Row] { var conn: Connection = _ override def open(partitionId: Long, epochId: Long): Boolean = { Class.forName("com.mysql.jdbc.Driver") conn = DriverManager.getConnection(url, user, pwd) true } override def process(value: Row): Unit = { val p = conn.prepareStatement("replace into test(pid,pv) values(?,?)") p.setString(1, value(0).toString) p.setLong(2, value(1).toString.toLong) p.execute() } override def close(errorOrNull: Throwable): Unit = { conn.close() } } def main(args: Array[String]): Unit = { val spark = SparkSession.builder().appName("kafkaToMysqlTest").getOrCreate() val brokers = "localhost:9092" val topics = "test1" val df = spark.readStream.format("kafka").option("kafka.bootstrap.servers", brokers).option("subscribe", topics).load() import spark.implicits._ val kafkaDf = df.selectExpr("CAST(value AS STRING)").as[String] val dataFrame = kafkaDf.groupBy("value").count(). toDF("pid","pv") //todo 将数据写到MYSQL val mysqlSink = new MysqlSink("jdbc:mysql://localhost:3306/warehouse", "root", "410410410") val query = dataFrame.writeStream.outputMode("complete").foreach(mysqlSink).start() query.awaitTermination() }}

 

kafka -> structuredStreaming读取kafka日志 ->自定义输出到mysql

标签:connect   row   tst   ast   data   mod   write   owa   main   

小编还为您整理了以下内容,可能对您也有帮助:

kafka问题求助

kafka支持的特性如下:

1、消息持久化和缓存。Kafka高度依赖文件系统来存储和缓存消息。一般的人都认为“磁盘是缓慢的”,这使得人们对“持久化结构提供具有竞争性的性能”这样的结论持有怀疑态度。实际上,磁盘比人们预想的快很多也慢很多,这取决于它们如何被使用;一个好的磁盘结构设计可以使之跟网络速度一样快。

2、消息读取。Kafka在读方面使用了sendfile这个高级系统函数,也即zero-copy技术,感兴趣的同学可以去阅读IBM的文章。 这项技术通过减少系统拷贝次数,极大地提高了数据传输的效率。

3、端到端的批量压缩。在许多场景下,瓶颈实际上不是CPU而是网络。这在需要在多个数据中心之间发送消息的数据流水线的情况下更是如此。当然,用户可以不需要Kafka的支持而发送压缩后的消息,但是这会导致非常差的压缩率。高效的压缩需要将多个消息一块儿压缩而不是对每一个消息进行压缩。理想情况下,这可以在端到端的情况下实现,数据会先被压缩,然后被生产者发送,并且在服务端也是保持压缩状态,只有在最终的消费者端才会被解压缩。

kafka问题求助

kafka支持的特性如下:

1、消息持久化和缓存。Kafka高度依赖文件系统来存储和缓存消息。一般的人都认为“磁盘是缓慢的”,这使得人们对“持久化结构提供具有竞争性的性能”这样的结论持有怀疑态度。实际上,磁盘比人们预想的快很多也慢很多,这取决于它们如何被使用;一个好的磁盘结构设计可以使之跟网络速度一样快。

2、消息读取。Kafka在读方面使用了sendfile这个高级系统函数,也即zero-copy技术,感兴趣的同学可以去阅读IBM的文章。 这项技术通过减少系统拷贝次数,极大地提高了数据传输的效率。

3、端到端的批量压缩。在许多场景下,瓶颈实际上不是CPU而是网络。这在需要在多个数据中心之间发送消息的数据流水线的情况下更是如此。当然,用户可以不需要Kafka的支持而发送压缩后的消息,但是这会导致非常差的压缩率。高效的压缩需要将多个消息一块儿压缩而不是对每一个消息进行压缩。理想情况下,这可以在端到端的情况下实现,数据会先被压缩,然后被生产者发送,并且在服务端也是保持压缩状态,只有在最终的消费者端才会被解压缩。

kafka获取数据的几种方式

一、基于Receiver的方式

这种方式使用Receiver来获取数据。Receiver是使用Kafka的高层次Consumer API来实现的。receiver从Kafka中获取的数据都是存储在Spark Executor的内存中的,然后Spark Streaming启动的job会去处理那些数据。

然而,在默认的配置下,这种方式可能会因为底层的失败而丢失数据。如果要启用高可靠机制,让数据零丢失,就必须启用Spark Streaming的预写日志机制(Write Ahead Log,WAL)。该机制会同步地将接收到的Kafka数据写入分布式文件系统(比如HDFS)上的预写日志中。所以,即使底层节点出现了失败,也可以使用预写日志中的数据进行恢复。

如何进行Kafka数据源连接

1、在maven添加依赖

<dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming-kafka_2.10</artifactId><version>1.4.1</version></dependency>

2、scala代码

val kafkaStream = {val sparkStreamingConsumerGroup = "spark-streaming-consumer-group"val kafkaParams = Map("zookeeper.connect" -> "zookeeper1:2181","group.id" -> "spark-streaming-test","zookeeper.connection.timeout.ms" -> "1000")val inputTopic = "input-topic"val numPartitionsOfInputTopic = 5val streams = (1 to numPartitionsOfInputTopic) map { _ =>KafkaUtils.createStream(ssc, kafkaParams, Map(inputTopic -> 1), StorageLevel.MEMORY_ONLY_SER).map(_._2)}val unifiedStream = ssc.union(streams)val sparkProcessingParallelism = 1 // You'd probably pick a higher value than 1 in proction.unifiedStream.repartition(sparkProcessingParallelism)}

需要注意的要点

1、Kafka中的topic的partition,与Spark中的RDD的partition是没有关系的。所以,在KafkaUtils.createStream()中,提高partition的数量,只会增加一个Receiver中,读取partition的线程的数量。不会增加Spark处理数据的并行度。

2、可以创建多个Kafka输入DStream,使用不同的consumer group和topic,来通过多个receiver并行接收数据。

3、如果基于容错的文件系统,比如HDFS,启用了预写日志机制,接收到的数据都会被复制一份到预写日志中。因此,在KafkaUtils.createStream()中,设置的持久化级别是StorageLevel.MEMORY_AND_DISK_SER。

二、基于Direct的方式

这种新的不基于Receiver的直接方式,是在Spark 1.3中引入的,从而能够确保更加健壮的机制。替代掉使用Receiver来接收数据后,这种方式会周期性地查询Kafka,来获得每个topic+partition的最新的offset,从而定义每个batch的offset的范围。当处理数据的job启动时,就会使用Kafka的简单consumer api来获取Kafka指定offset范围的数据。

这种方式有如下优点:

1、简化并行读取:如果要读取多个partition,不需要创建多个输入DStream然后对它们进行union操作。Spark会创建跟Kafka partition一样多的RDD partition,并且会并行从Kafka中读取数据。所以在Kafka partition和RDD partition之间,有一个一对一的映射关系。

2、高性能:如果要保证零数据丢失,在基于receiver的方式中,需要开启WAL机制。这种方式其实效率低下,因为数据实际上被复制了两份,Kafka自己本身就有高可靠的机制,会对数据复制一份,而这里又会复制一份到WAL中。而基于direct的方式,不依赖Receiver,不需要开启WAL机制,只要Kafka中作了数据的复制,那么就可以通过Kafka的副本进行恢复。

3、一次且仅一次的事务机制:

基于receiver的方式,是使用Kafka的高阶API来在ZooKeeper中保存消费过的offset的。这是消费Kafka数据的传统方式。这种方式配合着WAL机制可以保证数据零丢失的高可靠性,但是却无法保证数据被处理一次且仅一次,可能会处理两次。因为Spark和ZooKeeper之间可能是不同步的。

基于direct的方式,使用kafka的简单api,Spark Streaming自己就负责追踪消费的offset,并保存在checkpoint中。Spark自己一定是同步的,因此可以保证数据是消费一次且仅消费一次。

scala连接代码

val topics = Set("teststreaming")val brokers = "bdc46.hexun.com:9092,bdc53.hexun.com:9092,bdc54.hexun.com:9092" val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers, "serializer.class" -> "kafka.serializer.StringEncoder")// Create a direct stream val kafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics)val events = kafkaStream.flatMap(line => {Some(line.toString())})

三、总结:两种方式在生产中都有广泛的应用,新api的Direct应该是以后的首选方式。

kafka获取数据的几种方式

一、基于Receiver的方式

这种方式使用Receiver来获取数据。Receiver是使用Kafka的高层次Consumer API来实现的。receiver从Kafka中获取的数据都是存储在Spark Executor的内存中的,然后Spark Streaming启动的job会去处理那些数据。

然而,在默认的配置下,这种方式可能会因为底层的失败而丢失数据。如果要启用高可靠机制,让数据零丢失,就必须启用Spark Streaming的预写日志机制(Write Ahead Log,WAL)。该机制会同步地将接收到的Kafka数据写入分布式文件系统(比如HDFS)上的预写日志中。所以,即使底层节点出现了失败,也可以使用预写日志中的数据进行恢复。

如何进行Kafka数据源连接

1、在maven添加依赖

<dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming-kafka_2.10</artifactId><version>1.4.1</version></dependency>

2、scala代码

val kafkaStream = {val sparkStreamingConsumerGroup = "spark-streaming-consumer-group"val kafkaParams = Map("zookeeper.connect" -> "zookeeper1:2181","group.id" -> "spark-streaming-test","zookeeper.connection.timeout.ms" -> "1000")val inputTopic = "input-topic"val numPartitionsOfInputTopic = 5val streams = (1 to numPartitionsOfInputTopic) map { _ =>KafkaUtils.createStream(ssc, kafkaParams, Map(inputTopic -> 1), StorageLevel.MEMORY_ONLY_SER).map(_._2)}val unifiedStream = ssc.union(streams)val sparkProcessingParallelism = 1 // You'd probably pick a higher value than 1 in proction.unifiedStream.repartition(sparkProcessingParallelism)}

需要注意的要点

1、Kafka中的topic的partition,与Spark中的RDD的partition是没有关系的。所以,在KafkaUtils.createStream()中,提高partition的数量,只会增加一个Receiver中,读取partition的线程的数量。不会增加Spark处理数据的并行度。

2、可以创建多个Kafka输入DStream,使用不同的consumer group和topic,来通过多个receiver并行接收数据。

3、如果基于容错的文件系统,比如HDFS,启用了预写日志机制,接收到的数据都会被复制一份到预写日志中。因此,在KafkaUtils.createStream()中,设置的持久化级别是StorageLevel.MEMORY_AND_DISK_SER。

二、基于Direct的方式

这种新的不基于Receiver的直接方式,是在Spark 1.3中引入的,从而能够确保更加健壮的机制。替代掉使用Receiver来接收数据后,这种方式会周期性地查询Kafka,来获得每个topic+partition的最新的offset,从而定义每个batch的offset的范围。当处理数据的job启动时,就会使用Kafka的简单consumer api来获取Kafka指定offset范围的数据。

这种方式有如下优点:

1、简化并行读取:如果要读取多个partition,不需要创建多个输入DStream然后对它们进行union操作。Spark会创建跟Kafka partition一样多的RDD partition,并且会并行从Kafka中读取数据。所以在Kafka partition和RDD partition之间,有一个一对一的映射关系。

2、高性能:如果要保证零数据丢失,在基于receiver的方式中,需要开启WAL机制。这种方式其实效率低下,因为数据实际上被复制了两份,Kafka自己本身就有高可靠的机制,会对数据复制一份,而这里又会复制一份到WAL中。而基于direct的方式,不依赖Receiver,不需要开启WAL机制,只要Kafka中作了数据的复制,那么就可以通过Kafka的副本进行恢复。

3、一次且仅一次的事务机制:

基于receiver的方式,是使用Kafka的高阶API来在ZooKeeper中保存消费过的offset的。这是消费Kafka数据的传统方式。这种方式配合着WAL机制可以保证数据零丢失的高可靠性,但是却无法保证数据被处理一次且仅一次,可能会处理两次。因为Spark和ZooKeeper之间可能是不同步的。

基于direct的方式,使用kafka的简单api,Spark Streaming自己就负责追踪消费的offset,并保存在checkpoint中。Spark自己一定是同步的,因此可以保证数据是消费一次且仅消费一次。

scala连接代码

val topics = Set("teststreaming")val brokers = "bdc46.hexun.com:9092,bdc53.hexun.com:9092,bdc54.hexun.com:9092" val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers, "serializer.class" -> "kafka.serializer.StringEncoder")// Create a direct stream val kafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics)val events = kafkaStream.flatMap(line => {Some(line.toString())})

三、总结:两种方式在生产中都有广泛的应用,新api的Direct应该是以后的首选方式。

java该怎么自学?

自学的困难就是,不知道该从哪里开始,才怎么学,没有一个系统的学习路径,现在黑马程序员最新上线了java学习路线图,非常好的解决了一个难题,可以去搜索看一下。

一、java基础

学习任何一门编程语言,首先要学习的是基础语法,开启Java学习的第一步,当然就是深入掌握计算机基础、编程基础语法,面向对象,集合、IO流、线程、并发、异常及网络编程,这些我们称之为JavaSE基础。当你掌握了这些内容之后,你就可以做出诸如:电脑上安装的迅雷下载软件、QQ聊天客户端、考勤管理系统等桌面端软件。

JavaSE基础是Java中级程序员的起点,是帮助你从小白到懂得编程的必经之路。

在Java基础板块中有6个子模块的学习:

技术树

二、数据库

互联网最具价值的是数据,任何编程语言都需要解决数据存储问题,而数据存储的关键技术是数据库。MySQL和Oracle都是广受企业欢迎的数据库管理系统。Java程序和数据库通信的最常见技术是JDBC,Druid和C3P0。学习这些数据库技术后,可以掌握数据库运维技术、复杂业务表结构设计规范、工作中常见的SQL操作、软件数据存储等。

数据库不仅仅是Java开发工程师的必学课程,也是其他语言都需要掌握的技能。用于对交互过程中客户的数据进行存储。

该板块包括关系型数据库和非关系型数据库。

例如:MySQL、oracle、redis、MongoDB等。数据库学习完毕后,可以将数据存储到数据库中,也可以通过SQL语句从数据库中查询数据,结合Java项目可以实现动态站点的数据的保存。

技术树

三、前端技术

浏览器展示给用户看到的网页就是前端,前端有三大基础技术分别为Html、CSS、JavaScript,这些学完后,为了做出更好、更炫的交互式体验效果,我们还需要学习jQuery、ElementUI、Vue、Ajax,以及打包工具webpack。学完这些技术后,我们可以开发微信小程序、响应式网站、移动端网站、开发类似京东一样的B2B2C商城、管理后台等。

Javaweb阶段包括前端、数据库和动态网页。Javaweb是互联网项目的入门课程,是学习后面高进阶课程的基础。

首先,我们先看一下前端板块。该板块主要包括如下几个模块:

学习前端技术后,可以完成类似京东、淘宝的前端工程的编写。

技术树

四、动态网页

掌握前端技术只能做静态网站,但它页面数据一成不变,而动态网站可以根据数据库中变更的数据实现不同的内容展示,应用更广泛,因此程序员必须要学会做动态网站。使用Java做动态网站,我们需要学习Servlet、Filter、Session、Cookie、JSP、EL表达式、JSTL等做动态网站的完整知识体系,学完可研发出OA系统、内容网站、BBS等。

动态网页是中级程序员服务器端编程的基础,是高级框架学习的必备课程,后期学习的框架、服务底层都是基于动态网页技术之上的。

该板块包括Javaweb核心技术、包括Servlet、Request、Response、Cookie和Session等,通过这些技术的学习可以完成动态站点开发,可更好的完成服务器端与客户的交互,让页面的数据“动”起来,做出小型的应用系统。

技术树

五、编程强化

前面学了JavaSE基础,但它在企业级应用中程序处理业务的效率并不高、扩展差,编程强化是对JavaSE基础的加强,将针对性的提高程序处理业务的执行效率、增强程序扩展性。编程强化将加强多线程高级学习,涉及线程内存、线程通信等技术。学完以后,能增加一个中级程序员的知识储备,无论在面试过程中还是将来技术的深入打一个良好的基础。

编程强化是对解决实际问题方面做一个深入的了解和应用,是对JavaSE基础的加强,对后期自动以框架和对一些服务框架的底层理解做支撑。

编程强化板块主要包括如下几个模块:多线程高级、涉及线程内存、线程通信等;JVM优化,对JVM底层进行调优来提高项目执行效率;NIO,同步非阻塞IO来提高效率。

学习该阶段,可以对原有项目进行优化从而使程序更快更稳定。

技术树

六、软件项目管理

公司开发都是团队协同开发,为更好的掌握实际开发,我们还需要学习常用的项目管理平台、版本控制器、项目构建工具以及自动化部署工具。项目开发一定是有版本升级的,管理好项目进度和版本需要Git、Maven、Sonar这样的系统平台。学习完软件项目管理后,将掌握整个项目实际开发过程以及整个项目开发过程中所使用协同开发工具。

JavaSE基础是Java中级程序员的起点,是帮助你从小白到懂得编程的必经之路。

在Java基础板块中有6个子模块的学习:基础语法,可帮助你建立基本的编程逻辑思维;面向对象,以对象方式去编写优美的Java程序;集合,后期开发中存储数据必备技术;IO,对磁盘文件进行读取和写入基础操作;多线程与并发,提高程序效率;异常,编写代码逻辑更加健全;网络编程,应用服务器学习基础,完成数据的远程传输。

学习该阶段,可以完成一些简单的管理系统、坦克大战游戏、QQ通信等。

技术树

七、热门技术框架

Javaweb掌握后,已经具备企业中实际项目的开发能力了,但它开发效率低,代码量大,开发周期长、开发成本高。企业中广泛使用一些优秀的框架技术来解决上述问题,因此我们还需要学习框架技术,项目开发中主流的Java框架技术有SpringMVC、Spring、MyBatis、MyBatis Plus、SpringData等。这些框架技术都是一个优秀程序员所必备的技能。

使用Javaweb进行企业级开发是完全可以的,但是开发效率比较低,所以对常用的逻辑操作进行封装就形成了框架,因此框架是企业开发的入门技能。

热门框架板块主流框架有如下几个:Spring框架,占据统治地位,其生态系统涉及各个方面解决方案;MyBatis框架,使用ORM思想对数据库进行操作。

该板块学习后,就可以进行真实企业级项目开发了,做出的项目也会更加符合企业要求。

技术树

八、分布式架构

需要用到分布式微服务的技术。学习完该阶段课程,可以具备大型SOA架构和微服务架构能力,能掌握大型微服务项目必备技术和实际经验。企业发展过程中,业务量和用户量逐渐增加,为了保证系统的可用性,系统越做越复杂,研发人员增多,大家很难共同维护一个复杂的系统,往往修改部分内容,导致牵一发而动全身,所以我们需要升级系统架构,

随着互联网的发展,业务的复杂性和用户的体验性都需要提高,所以分布式架构出现了。该板块主要讲解的是分布式架构的相关解决方案。

主要包括如下模块:Dubbo,高性能的 RPC 服务发布和调用框架;SpringBoot,简化Spring应用的初始搭建以及开发过程;Spring Cloud,一系列框架的有序集合,如服务发现注册、配置中心、负载均衡、断路器、数据监控等。

该板块的学习,可以具备大型互联网项目开发的必备技术和实际经验,为进入BATJ打下基础

技术树

九、服务器中间件

在分布式系统架构中,服务与服务之间的异步通信,是非常常见的需求之一,消息中间件的诞生正是为了解决这类问题。目前市面上的主流消息中间件有RabbitMQ、RocketMQ、Kafka,我们将学习这3个消息中间件,实现分布式项目中的异步通信。学习完这些后,可以实现分布式项目的异步通信、分布式应用日志收集、分布式事务等。

中间件板块是大型互联网项目中必备的。服务中间件可以帮助各子模块间实现互相访问,消息共享或统一访问等功能。其包括远程服务框架中间件,例如阿里(Apache)的RPC框架Dubbo等;消息队列中间件,例如:阿里巴巴开源分布式中间件RocketMQ、高吞吐量消息发布和流处理服务Kafka等。

学习服务中间件是中级JavaEE工程师必要技术,也是JavaEE架构师必须精通的技术。

技术树

十、服务器技术

程序开发完成后,我们把它们打包部署到服务器中运行,所以我们需要学习常见的服务器技术,常见的服务器有Linux和Window server,Linux性能高,是当前主流。我们写好的项目需要用一个软件运行起来,这个软件叫web容器,我们需要在服务器上安装web容器来发布项目,当前主流的web容器有tomcat、jetty、nginx、undertow。

不管是使用原生Javaweb进行开发,还是使用框架进行开发,项目最终需要对外发布才能供全世界的人访问到,而服务器板块就可以解决这个问题,所以服务器是项目发布的必要技术。该板块包括虚拟化和web应用服务器的学习,主要包括如下几个模块:Vmware,虚拟机软件;Linux,专门用于服务器的系统;Nginx,集群部署时反向代理服务器;Tomcat,项目发布时主要使用的服务器。

该板块学习后,我们就可以把开发好的项目发布到服务器中,然后供你的小伙伴远程访问了,超酷!

技术树

十一、容器技术

具备了服务器操作系统及web容器,我们就可以部署单机的站点,在分布式系统中,几十上百的服务,如果使用单机这种部署方式,会投入很高的人力,同时出错的几率也大。所以服务器虚拟化技术Docker也称为如今的必备技术了,Docker可以帮助运维人员实行快速部署,批量维护.使用Kubernetes实现自动化部署、大规模可伸缩、应用容器管理。

容器化技术是近两年超级火的一个专题,通过容器化技术可以对环境进行打包,方便移植,大大提高了开发效率。该板块包括容器化技术Docker和其平台管理引擎Kubernetes,其中,Docker 是一个开源的应用容器引擎,可以打包应用以及依赖包到一个可移植的镜像中,然后发布到任何流行的Linux或Windows 机器上,也可以实现虚拟化。而Kubernetes是一个开源的,用于管理云平台中多个主机上的容器化的应用,Kubernetes的目标是让部署容器化的应用简单并且高效。通过该板块的学习,你可以通过上述技术快速搭建环境,节省开发时间,提高开发效率。

技术树

十二、业务解决方案

企业开发中会遇到一些通用的业务场景,诸如:搜索引擎、缓存、定时任务、工作流、报表导出、日志管理、系统监控等,那么这些通用的解决方案也有现成优秀的免费开源中间件,可供使用。诸如:ElasticSearch、Lucene、Solr、redis、MongoDB、slf4J、ECharts、Quartz、POI等。业务解决方案课程的业务方案和技术难点,解决了企业开发中90%以上的痛点和难点。

虽然我们已经具备了基础技术和高阶技术,但是要想与企业开发相接轨,还需要对实际项目的业务解决方案进行探究。而此版块就是在实际业务场景中的真实解决方案集合,常用的业务解决方案有如下:搜索业务场景解决方案、日志收集与分析场景解决方案、工作流引擎场景解决方案、任务调度场景解决方案、地图开发平台场景解决方案、支付开放平台场景解决方案、图表可视化场景解决方案。通过分析实际业务来学习这个解决方案技术集,完全可以达到中级甚至高级工程师水平。

技术树

数据分析需要掌握哪些知识?

数据分析要掌握主流的数据分析方法。

1、事件分析

可以根据用户在企业APP、网站、小程序等平台上的操作记录或是行为日志,来确定用户在平台上各个板块之间行为的规律和特点,通过商业智能BI数据分析,研究出用户的内心需求,对板块内容进行优化调整,一般会涉及浏览页面、点击元素、访问板块等。

2、热力图分析

和事件分析类似,热力图一般指用户访问企业网站、APP和小程序时,会在一些元素和板块进行停留,根据这些在元素和板块上的点击次数、点击率、访问次数、访问人数等,通过商业智能BI以高亮图形形式进行显示,可以方便识别用户行为,优化逻辑。

数据分析-派可数据商业智能BI

3、留存分析

留存一般在运营工作中比较常见,可以用来衡量企业提供的产品和服务是否对用户有足够的吸引力,让用户在接触或使用产品和服务后,能够继续保持活跃,成为忠实用户,一般会将次日留存率、7日留存率、次月留存率等作为标准,以商业智能作为分析工具。

4、对比分析

一般用到对比分析,通常是在选定的时间区域内,对比业务在不同情况下的差异,分析出业务是进行了增长还是发生了缩减的情况。

例如,上图中2021年9月的销量相比8月的销量有所减少,这时候就要深入分析为什么环比销量会减少,可以考虑调取今年3月和去年3月的产品生产数量,看看是不是生产环比下降,导致销量较少。同理,还可以把供应链、经销商、人流量等等都拿进行对比分析,确认到底是什么影响了销量。

数据分析-派可数据商业智能BI

显示全文