Nina's Blog

Good Good Study, Day Day Up


  • Home

  • Archives

kafka学习笔记一:整体了解kafka

Posted on 2024-01-10 | In kafka

kafka

消息队列

因为mq很常见嘛所以就不多介绍啦,看一下消息队列的两种模式

点对点模式

消息生产者生产消息发送到Queue中,然后消息消费者从Queue中取出并且消费消息。消息被消费后,queue中不再有存储,所以消费者不可能消费到已经被消费的消息。Queue支持存在多个消费者,但是对于一个消息而言,只有一个消费者可以消费。(其实kafka的每个consumer group内可以理解为是点对点模式)

消息/订阅模式

消息生产者(发布)将消息发布到topic中,同时有多个消费者(订阅)消费该消息。和点对点方式不同,发布到topic的消息会被所有订阅者消费。也就是kafka使用的模式

基础架构

image-20221008131302682

这张图感觉看过好多好多遍。。。

  • Producer:消息生产者(发布)将消息发布到topic中,同时有多个消费者(订阅)消费该消息。和点对点方式不同,发布到topic的消息会被所有订阅者消费。
  • Consumer:消息消费者,向kafka broker取消息的客户端
  • Consumer Group:消费者组,由多个consumer组成。消费者组每个消费者负责消费不同分区的数据,一个分区只能由一个组内消费者消费;消费者组之间互不影响。所有的消费者属于某个消费者组,即消费者组是逻辑上的一个订阅者
  • Broker:一台Kafka服务器就是一个broker。一个集群由多个broker组成,一个broker可以容纳多个topic
  • Topic:可以理解为一个队列,生产者和消费者面向的都是一个topic
  • Partition:为了实现扩展性,一个非常大的topic分为多个partition,partition可以理解为物理分区,是最小的存储单元,掌握着一个 Topic 的部分数据。
  • Replication:副本,为保证集群中的某个节点发生故障时,该节点上的 partition 数据不丢失,且 kafka 仍然能够继续工作,kafka 提供了副本机制,一个 topic 的每个分区都有若干个副本,一个 leader 和若干个 follower

Partition

作为kafka的存储核心,还是单独捞出来写一下吧

Partition 中的每条记录都会被分配一个唯一的序号,称为 Offset(偏移量),Offset 是一个递增的、不可变的数字,由 Kafka 自动维护,当一条记录写入 Partition 的时候,它就被追加到 log 文件的末尾,并被分配一个序号,作为 Offset。

消息的顺序性:一个 Topic 如果有多个 Partition 的话,那么从 Topic 这个层面来看,消息是无序的。但单独看 Partition 的话,Partition 内部消息是有序的。所以,一个 Partition 内部消息有序,一个 Topic 跨 Partition 是无序的。如果强制要求 Topic 整体有序,就只能让 Topic 只有一个 Partition。

一个 Kafka 集群由多个 Broker(就是 Server) 构成,每个 Broker 中含有集群的部分数据,Kafka 把 Topic 的多个 Partition 分布在多个 Broker 中。

好处:

  • 增加吞吐量,把partition放在不同的broker里,那么整个topic的吞吐就不会受限于一台机器的i/o性能
  • 增加consumer消费能力,把不同的consumer分配去消费不同的partition,Consumer 的多个实例就可以连接不同的 Broker,大大提升了消息处理能力。
  • 增加数据备份,一个partition在不同的broker分为leader和follower去备份数据

存储

看一下kafka的存储机制:

image-20221008142532471

其实就是分片和索引的机制,每个index文件里面都会以当前segment的第一条消息的offset命名,并且顺序向下记录,本质上就是实现了一套稀疏索引。

要查找一个offset的数据的时候,先去遍历每个segment,定位到该offset在哪个segment里面,再在segment里面查找到对应offset的数据

Producer

生产者负责的是发送消息到对应的partition,所以需要确认的有以下件事

一:发送数据到哪个partition

二:partition对应的broker是哪个

三:如何保证数据已经发送成功,需不需要重试

分区器

其实分区器前面还有拦截器和序列化器,分别做了一些数据处理和数据序列化的逻辑,之后分区器用来确定需要发送的分区,kafka提供了三种分区方式:

  1. 指定partition的情况下,直接将指定的值作为partition值
  2. 没有指定partition值但有key的情况下,将key的hash 值与 topic 的 partition 数进行取余得到 partition 值
  3. 既没有 partition 值又没有 key 值的情况下,第一次调用时随机生成一个整数(后面每次调用在这个整数上自增),将这个值与 topic 可用的 partition 总数取余得到 partition 值,也就是常说的 round-robin 算法。

broker信息

这个后面会提到,kafka的集群是借助zk管理的,也就是说每个brocker都会向zk注册。而kafka集群中会选举一个broker作为controller,controller会负责整个集群的主题管理(包括主题创建,删除,分区分配,增加和重分配)等操作。

Kafka使用了一种称为”Metadata Request”的机制来获取Broker的元数据信息。当Producer需要发送消息到指定Partition时,它会向Kafka集群发送一个Metadata Request请求,请求包含了目标Topic和Partition的信息。

Kafka集群中的一个Broker(通常是Controller)会接收到这个请求,并返回包含了目标Partition所在的Broker的元数据信息的响应。这个元数据信息包括了Broker的地址(IP和端口),Producer可以根据这些信息来建立与目标Broker的连接。

producer本地有一个属性会保存集群中broker的列表,但是Kafka的元数据信息是动态变化的,因为集群中的Broker和Partition可能会发生变化。因此,Producer通常会定期或在需要时发送Metadata Request来获取最新的元数据信息,以确保消息发送到正确的Broker和Partition。

数据可靠性

为保证producer发送的数据,即可靠的发送到指定的topic,topic的每个partition收到producer发送的数据后,都需要向producer发送ack(acknowledgement 确认收到),如果producer收到ack,就会进行下一轮发送,否则会重新发送数据。

那么什么时候发送ack呢?

首先producer发送的一定是leader节点,leader节点收到数据之后会有两个操作,一个是落盘,一个是同步数据到follower,所以这里kafka提供了三种不同的ack返回方案配置:

  • 0:producer不等待broker的ack,这一操作提供了一个最低的延迟,broker一接受到还没有写入磁盘就已经返回,当broker由故障的时候有可能丢失数据
  • 1:producer等待broker的ack,partition的leader落盘成功后返回ack,如果follower同步成功之前leader故障,那么将丢失数据
  • -1(all):producer等待broker的ack,partition的leader和follower全部落盘成功后才返回ack。但是如果follower同步成功之后,broker发送ack之前,leader发生故障,那么会操成数据重复

0和-1,一个是at most once,最多只发一次,数据不重复,但是会丢失,一个是at least once,数据保证不会丢失但是会重复,所以用容易丢失的方案则要考虑数据是否重要,用at least once的方案就需要考虑数据幂等性

至于all的方案,zk我们知道,是只要保证有一半以上的follower同步成功就ok,但是kafka选择的则是全部follower都同步成功才可以,因为如果用zk的方案,2n+1,那么就会有n台机器是增加出来的应付故障的冗余数据,zk的znode节点数据相当小,但是kafka这种侧重数据存储的逻辑会造成n台机器的数据冗余,成本过高

ISR

那么如果其中有一台机器出问题了同步不了怎么办?ack就一直卡在那里么?

Leader 维护了一个动态的 in-sync replica set (ISR),意为和 leader 保持同步的 follower 集合。当 ISR 中的 follower 完成数据的同步之后,leader 就会给 follower 发送 ack。如果 follower长时间未向leader 同步数据 , 则该 follower将 被 踢 出 ISR,该时间阈值由replica.lag.time.max.ms 参数设定。Leader 发生故障之后,就会从 ISR 中选举新的 leader。

数据幂等

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
> Kafka 通过 Producer Id(生产者标识符)和 Sequence Number(序列号)来保证消息不会被重复发送。以下是 Kafka 如何实现这一点的工作原理:
>
> Producer Id(PID): 每个 Kafka 生产者在初始化时都会获得一个唯一的 Producer Id。Producer Id 是一个持久性标识符,通常与生产者的客户端实例相关联。Kafka 会跟踪每个生产者的 Producer Id。
>
> Sequence Number(序列号): 每个请求(消息)都包含一个单调递增的序列号。序列号从1开始,每次递增1。序列号用于标识请求的顺序。
>
> Producer Id 和 Sequence Number 的存储: Kafka 将 Producer Id 和 Sequence Number 存储在每个分区的分区日志中。这个存储允许 Kafka 跟踪每个分区的最新序列号。
>
> 消息发送过程: 当生产者发送一条消息到 Kafka 时,消息会携带生产者的 Producer Id 和序列号。Kafka 会根据 Producer Id 和序列号来验证消息的幂等性和顺序性。
>
> Kafka 服务器的处理: Kafka 服务器会在接收到消息后,首先检查 Producer Id 和序列号是否已经在该分区的分区日志中出现过。如果这个 Producer Id 和序列号已经存在,说明消息已经被处理,Kafka 将丢弃重复的消息。如果 Producer Id 和序列号是新的,Kafka 会接受消息并更新分区的最新序列号。
>
> 重试机制: 如果生产者因某种原因未收到来自 Kafka 的确认(ACK)并怀疑消息未被成功发送,它会重新发送相同的消息。由于消息中包含了 Producer Id 和序列号,Kafka 可以识别重复的消息并再次将其过滤掉。
>
> 通过这个机制,Kafka 确保了消息不会被重复发送。每条消息都有唯一的 Producer Id 和序列号,这使得 Kafka 能够在接收消息时检查重复,并且保持消息的幂等性和顺序性。这对于确保数据的可靠性和一致性非常重要,尤其在分布式系统中。
>

Consumer

消费模式

pull模式:消费者从broker中主动拉取数据

好处:

  1. 消费者可以根据自己的消费速率来控制拉取速度
  2. 消费者可以自由决定拉取的数据量,比如我们之前一批一百个拉数据处理,可以避免高频少量数据写入ck
  3. 可以通过增加消费者的数量来增加吞吐量(我理解要结合partition数量来看)

上面讲到Kafka采用pull模式获取数据, 在没有数据时会进行空转, Kafka针对这一点采用在消费者消费数据时传入一个时长参数 timeout, 如果没消息是, 消费者等待该 time

分区分配策略

我们已知一个consumer group内采用的是点对点的策略,也就是说每个consumer消费的内容是不一样的,kafka采用的是把partition分配给consumer的逻辑,那么把哪个分区分配到哪个consumer呢?

Range

对于每个topic,Range是对每个Topic而言的(即一个Topic一个Topic的分),首先对同一个Topic里面的分区按照序号进行排序,并对消费者按字母顺序进行排序,然后用partitions分区的个数除以消费者线程的总数来决定每个消费者线程消费几个分区,如果除不尽,那么前几个消费者线程将会多消费一个分区。

这样对于单个topic来说,分配是尽可能平衡的,但是如果这个consumer消费了多个topic呢?

假如我们有两个topic T1,T2,分别有10个分区,最后的分配结果将会是这样

1
2
3
C1-0:T1(0,1,2,3) T2(0,1,2,3)
C2-0:T1(4,5,6) T2(4,5,6)
C2-1:T1(7,8,9) T2(7,8,9)

如上,只针对一个topic而言,C1-0消费者多消费1个分区影响不是很大。如果有 N 多个 topic,那么针对每个 topic,消费者 C1-0 都将多消费 1 个分区,topic越多,C1-0 消费的分区会比其他消费者明显多消费 N 个分区。这就是 Range 范围分区的一个很明显的弊端了

RoundRobin

总觉得很耳熟不知道为啥

RoundRobinAssignor策略的原理是将消费者组内所有消费者以及消费者所订阅的所有topic的partition按照字典序排序,然后通过轮询的方式逐个将分区以此分配给每个消费者。

使用RoundRobinAssignor策略有两个前提条件必须满足:

  1. 同一个消费者组里面的所有消费者的num.streams(消费者线程数)必须相等
  2. 每个消费者订阅的主题必须相同
    1. 如果消费者订阅的主题不一样的话,如果某个消费者没有订阅某个topic,那么分配的时候会跳过这个消费者,会导致分配的不均匀

StickyAssignor

代码逻辑本身很复杂,没仔细看,比起前面的分配原则可以看一下区别

sticky这个词的意思是黏性的,其实也就是说重分配的时候,尽量把原来就在这个consumer下的分区保留,这样可以减少销毁和重新建立连接的开销

Offset

消费者offset

每个消费者在消费每个分区的时候都会记录自己当前消费的offset,那么如果consumer出现故障,或者是服务重新发布之后,需要从之前的offset开始消费,所以这个offset不能被consumer自己保存,自己保存的话服务重建就不见了

kafka本身保存了消费者offset,可以想象成一个kv

key是由consumer group id + topic + partition 共同构成的复合key,value是offset

这个offset在kafka 0.9版本的时候是保存在zk的,但是zk这玩意儿频繁写不是什么好事,所以新版把这个值写入了kafka的一个内部topic,即__consumer_offsets topic,并且默认提供了kafka_consumer_groups.sh脚本供用户查看consumer信息。

partition水位

partition水位也就是partition当前的最大可读offset的值,这里引入两个概念HW(high watermark)和LEO(log end offset)

hw就是当前partition最大可读的offset

image-20221010155119686

当follower挂了,follower发生故障后会被临时提出ISR,待该follower恢复后,follower会读取本地磁盘记录的上次的HW,并将log文件高于HW的部分截取掉,从HW开始向leader进行同步,等该follower的LEO大于等于该Partition的HW,即follower追上leader之后,就可以重新加入ISR了。

如果是leader挂了呢?leader发生故障后,会从ISR种选出一个新的leader,之后,为保证多个副本之间数据一致性,其余的follower会先将各自log文件高于HW的部分截掉,然后从新的leader同步数据。

虽然我不大确定leader的选举机制,但是i guess是leo最大的

那么hw是怎么同步的?

  1. 首先,生产者不停地向Leader写入数据,这时候Leader的LEO可能已经达到了10,但是HW依然是0,两个Follower向Leader请求同步数据,他们的值都是0。

  2. 然后,消息还在继续写入,Leader的LEO值又发生了变化,两个Follower也各自拉取到了自己的消息,于是更新自己的LEO值,但是这时候Leader的HW依然没有改变。

  3. 此时,Follower再次向Leader拉取数据,这时候Leader会更新自己的HW值,取Follower中的最小的LEO值来更新。
  4. 之后,Leader响应自己的HW给Follower,Follower更新自己的HW值,因为又拉取到了消息,所以再次更新LEO,流程以此类推。

读写

缓冲和缓存

缓存区是内存和cpu之间的存储区,用来存储cpu多次从内存读取到的数据,多次读取同一变量,就会将该变量放到缓存区中,当再次调用时直接从缓存区取,不从内存中取,当内存中该变量改变时,这种情况会导致内存不可见问题;这样做的目地是比访问内存更快。具体可以自己看高速缓存

缓冲区是内存中的存储区, 缓冲区分为内核缓冲区和用户缓冲区;功能不同。

内核缓冲区是网络传输,文件io,控制台输入输入等操作的“中转站”,为不可见内存,即用户不能直接使用。

内核缓冲区是为了暂时存储磁盘数据,因为每次从磁盘读数据开销非常大,所以比方说文件读取,最大的性能瓶颈就卡在了读磁盘上

  1. 数据预读
    1. 数据预读指的是,当程序发起 read() 系统调用时,内核会比请求更多地读取磁盘上的数据,保存在缓冲区,以备程序后续使用。这种数据的预取策略其实就是基于局部性原理
    2. 因此当我们向内核请求读取数据时,内核会先到内核缓冲区中去寻找,如果命中数据,则不需要进行真正的磁盘 I/O,直接从缓冲区中返回数据就行了;如果缓存未命中,则内核会从磁盘中读取请求的 page,并同时读取紧随其后的几个 page(比如三个),如果文件是顺序访问的,那么下一个读取请求就会命中之前预读的缓存(当然了,预读算法非常复杂,这里只是一个简化的逻辑)。
  2. 延时回写
    1. 延时回写指的是,当程序发起 write() 系统调用时,内核并不会直接把数据写入到磁盘文件中,而仅仅是写入到缓冲区中,几秒后(或者说等数据堆积了一些后)才会真正将数据刷新到磁盘中。对于系统调用来说,数据写入缓冲区后,就返回了。
    2. 延迟往磁盘写入数据的最大一个好处就是,可以合并更多的数据一次性写入磁盘,把小块的 I/O 变成大块 I/O,减少磁盘处理命令次数,从而提高提盘性能。
    3. 另一个好处是,当其它进程紧接着访问该文件时,内核可以从直接从缓冲区中提供更新的文件数据(这里又是充当 Cache 了)。

用户缓冲区则是可见内存,对用户来说可以直接使用。

因为内核缓冲区是不可见的,所以如果用户需要读内核缓冲区的数据,则需要切换到内核态,把数据读出来再切换到用户态放进堆里,涉及到上下文切换和堆栈变化。所以后来就设计了用户缓冲区,和内核缓冲区的操作是一样一样的,就是作用不一样,一个是为了减少磁盘io,一个是为了减少上下文切换

零拷贝

那么说完上面的先看一下正常来说查看一个磁盘文件的流程:

Step1: 从磁盘中读取数据到内核缓冲区

Step2: cpu从缓存中拷贝到用户缓冲区

image.png

写同理,用户缓冲区->socket缓冲区->网卡

对于kafka来说整个IO的过程需要进行两次DMA拷贝,两次CPU拷贝,四次上下文切换。总共四次拷贝,四次切换。这个代价确实有些大。

那么零拷贝技术倒也不是说真的就不拷贝了,而是尽量根据不同场景简化文件读写的拷贝次数,从而尽量减少上下文切换,提高数据传输效率。

歪个楼,虽然不是kafka的零拷贝,但是也是一种零拷贝技术:mmap+write,本质上就是利用mmap将内核缓冲区的的地址直接映射到用户缓冲区,具体可以看一下这个博客:https://baijiahao.baidu.com/s?id=1769186849807925293&wfr=spider&for=pc

img

而kafka用到的就是文中的sendfile()方法,基于java.nio包下的FileChannel.transferTo()实现零拷贝。简单解释就是不需要走一遍用户进程和用户缓冲区的处理,直接把数据copy到socket缓冲区

img

这种方式下用户程序不能对数据进行修改,而只是单纯地完成了一次数据传输过程。整个拷贝过程会发生 2 次上下文切换,1 次 CPU 拷贝和 2 次 DMA 拷贝。

page cache

Kafka 对消息的存储和缓存严重依赖于文件系统。人们对于“磁盘速度慢”具有普遍印象,事实上,磁盘的速度比人们预期的要慢的多,也快得多,这取决于人们使用磁盘的方式。

Kafka重度依赖底层OS提供的page cache功能。当上层有写操作时,OS只是将数据写入到page cache,同时标记page属性为dirty。当读操作发生时,先从page cache中查找,如果发生缺页才进行磁盘调度,最终返回需要的数据。

个人理解page cache就是上面提到的内核缓冲区

zookeeper

在这里插入图片描述

Controller

这就又要提到前面写过的controller了

怎么选出来的呢controller,就是和zk帮忙选master一样,所有broker都会去注册controller节点,最小的那个就是当前的controller

  1. 主题管理:这里的主题管理,就是指控制器帮助我们完成对 Kafka 主题的创建、删除以及分区增加的操作。换句话说,当我们执行kafka-topics 脚本时,大部分的后台工作都是控制器来完成的。
  2. 分区重分配:主要是指,kafka-reassign-partitions 脚本提供的对已有主题分区进行细粒度的分配功能。这部分功能也是控制器实现的。
  3. Preferred 领导者选举:主要是 Kafka 为了避免部分 Broker 负载过重而提供的一种换 Leader 的方案。后面会详细介绍
  4. 集群成员管理:这是控制器提供的第 4 类功能,包括自动检测新增 Broker、Broker 主动关闭及被动宕机。
    1. 这种自动检测是依赖于zk的Watch 功能和 ZooKeeper 临时节点组合实现的。比如,控制器组件会利用Watch 机制检查 ZooKeeper 的 /brokers/ids 节点下的子节点数量变更。目前,当有新 Broker 启动后,它会在 /brokers 下创建专属的 znode 节点。一旦创建完毕,ZooKeeper 会通过 Watch 机制将消息通知推送给控制器,这样,控制器就能自动地感知到这个变化,进而开启后续的新增 Broker 作业。
    2. 侦测 Broker 存活性则是依赖于zk的另一个机制:临时节点。每个 Broker 启动后,会在 /brokers/ids 下创建一个临时 znode。当 Broker 宕机或主动关闭后,该 Broker 与 ZooKeeper 的会话结束,这个 znode 会被自动删除。同理,ZooKeeper 的 Watch 机制将这一变更推送给控制器,这样控制器就能知道有 Broker 关闭或宕机了,从而进行“善后”。
  5. 控制器的最后一大类工作,就是向其他 Broker 提供数据服务。控制器上保存了最全的集群元数据信息,其他所有 Broker 会定期接收控制器发来的元数据更新请求,从而更新其内存中的缓存数据。

节点注册

看上面的图就能看出来,broker注册在broker/ids节点下,topic注册在broker/topics下

消费者组注册在consumers/{group_id}节点下,下面有三个子节点分别是:

ids:消费者

owners:该消费者组消费的topic

offsets:记录每个topic每个partition的offset

image-20221009184409066

分区重分配

当集群中增加或者broker宕机的时候,需要让分区副本再次进行合理的分配,也就是所谓的分区重分配。

  1. 生成一个分区分配方案
  2. 先通过控制器为每个分区添加新副本(增加副本因子) , 新的副本将从分区的 leader副本那里复制所有的数据。根据分区的大小不同, 复制过程可能需要花一些时间, 因为数据是通过网络复制到新副本上的。
  3. 在复制完成之后,控制器将旧副本从副本清单里移除(恢复为原先的副本因子数)。注意在重分配的过程中要确保有足够的空间。

Leader和Follower

先说一下replica的概念,就是对同一个分区,为了保证数据不丢失,会给数据搞多个副本用于存储和同步,也就是leader和follower,不过和zk不一样,kafka的follower只负责同步,所有的数据生产和消费都是走leader的

常见的有以下几种情况会触发Partition的Leader Replica选举:

  1. Leader Replica 失效:当 Leader Replica 出现故障或者失去连接时,Kafka 会触发 Leader Replica 选举。
  2. Broker 宕机:当 Leader Replica 所在的 Broker 节点发生故障或者宕机时,Kafka 也会触发 Leader Replica 选举。
  3. 新增 Broker:当集群中新增 Broker 节点时,Kafka 还会触发 Leader Replica 选举,以重新分配 Partition 的 Leader。
  4. 新建分区:当一个新的分区被创建时,需要选举一个 Leader Replica。
  5. ISR 列表数量减少:当 Partition 的 ISR 列表数量减少时,可能会触发 Leader Replica 选举。当 ISR 列表中副本数量小于 Replication Factor(副本因子)时,为了保证数据的安全性,就会触发 Leader Replica 选举。
  6. 手动触发:通过 Kafka 管理工具(kafka-preferred-replica-election.sh),可以手动触发选举,以平衡负载或实现集群维护

leader选举

用zk的master原则,brokers注册顺序最小的就会变成leader

leader负载均衡

理论上我们肯定希望leader分布在不同的broker上,这样所有的broker都可以承担起一部分i/o,不会导致某个broker负载过高

Preferred Replica是Kafka中的一个概念,用于指定每个分区的首选副本(Preferred Replica)。首选副本是指在进行Leader选举时,优先选择作为分区Leader的副本。

zookeeper学习笔记一:整体了解zk

Posted on 2024-01-10 | In distribution

CAP

写在一切开始的cap,所有分布式系统都需要关注的内容

① C:Consistency,一致性,数据一致更新,所有数据变动都是同步的。

② A:Availability,可用性,系统具有好的响应性能。

③ P:Partition tolerance,分区容错性。以实际效果而言,分区相当于对通信的时限要求。系统如果不能在时限内达成数据一致性,就意味着发生了分区的情况,必须就当前操作在C和A之间做出选择,也就是说无论任何消息丢失,系统都可用。

该理论已被证明:任何分布式系统只可同时满足两点,无法三者兼顾。 因此,将精力浪费在思考如何设计能满足三者的完美系统上是愚钝的,应该根据应用场景进行适当取舍。

Consistency

① 强一致性(strong consistency)。任何时刻,任何用户都能读取到最近一次成功更新的数据。

② 单调一致性(monotonic consistency)。任何时刻,任何用户一旦读到某个数据在某次更新后的值,那么就不会再读到比这个值更旧的值。也就是说,可获取的数据顺序必是单调递增的。

③ 会话一致性(session consistency)。任何用户在某次会话中,一旦读到某个数据在某次更新后的值,那么在本次会话中就不会再读到比这个值更旧的值。会话一致性是在单调一致性的基础上进一步放松约束,只保证单个用户单个会话内的单调性,在不同用户或同一用户不同会话间则没有保障。

④ 最终一致性(eventual consistency)。用户只能读到某次更新后的值,但系统保证数据将最终达到完全一致的状态,只是所需时间不能保障。

⑤ 弱一致性(weak consistency)。用户无法在确定时间内读到最新更新的值。

基本结构

zk其实算是一个分布式管理系统,管理分布式节点的配置维护,可用性协调等等等等。打个最熟悉的比方hdfs里面,如果master节点挂了往往要再选举出一个master来,那么这个master节点挂了是怎么感知的呢,正常理解肯定是定时ping,但是利用zk的话就可以注册节点并实时感知到master节点的状态,并且可以很方便地进行选举,从而减少了大量集群内部通信压力

实际上,hdfs,kafka,hBase等等底层的调度都用了zk

目录

img

zk的目录结构和文件系统非常相似,有父节点,父节点下面可以创建子节点。

每个节点用路径来表示,路径必须是绝对且唯一的

znode

每个节点被称为一个znode,每个Znode由3部分组成:

① stat:此为状态信息, 描述该Znode的版本, 权限等信息

② data:与该Znode关联的数据

③ children:该Znode下的子节点

每个znode节点都有以下属性,后面会具体讲

img

znode有以下几个特征:

数据访问

  1. ZooKeeper中的每个节点存储的数据要被原子性的操作。也就是说读操作将获取与节点相关的所有数据,写操作也将替换掉节点的所有数据。另外,每一个节点都拥有自己的ACL(访问控制列表),这个列表规定了用户的权限,即限定了特定用户对目标节点可以执行的操作。
  2. ZooKeeper中的节点有两种,分别为临时节点和永久节点。节点的类型在创建时即被确定,并且不能改变。

    1. 临时节点:该节点的生命周期依赖于创建它们的会话。一旦会话(Session)结束,临时节点将被自动删除,当然可以也可以手动删除。虽然每个临时的Znode都会绑定到一个客户端会话,但他们对所有的客户端还是可见的。另外,ZooKeeper的临时节点不允许拥有子节点。
    2. 永久节点:该节点的生命周期不依赖于会话,并且只有在客户端显示执行删除操作的时候,他们才能被删除。
  3. 当创建Znode的时候,用户可以请求在ZooKeeper的路径结尾添加一个递增的计数。这个计数对于此节点的父节点来说是唯一的,它的格式为”%10d”(10位数字,没有数值的数位用0补充,例如”0000000001”)。当计数值大于232-1时,计数器将溢出。

功能和原理

基本操作

每个znode的基本操作有九个

znode的基本操作

watcher

简单来说就是监听一个节点的变化,如果发生变化的话就会触发监听事件并且把消息传给监听的客户端,监听是一个一次性事件,结束之后需要重新设置watcher

watch类型

ZooKeeper所管理的watch可以分为两类:

① 数据watch(data watches):getData和exists负责设置数据watch
② 孩子watch(child watches):getChildren负责设置孩子watch

我们可以通过操作返回的数据来设置不同的watch:

① getData和exists:返回关于节点的数据信息
② getChildren:返回孩子列表

因此

① 一个成功的setData操作将触发Znode的数据watch

② 一个成功的create操作将触发Znode的数据watch以及孩子watch

③ 一个成功的delete操作将触发Znode的数据watch以及孩子watch

img

原理

1
2
3
4
5
6
7
8
9
10
11
12
public class WatchManager {
// Logger
private static final Logger LOG = LoggerFactory.getLogger(WatchManager.class);

// watcher表
private final HashMap<String, HashSet<Watcher>> watchTable =
new HashMap<String, HashSet<Watcher>>();

// watcher到节点路径的映射
private final HashMap<Watcher, HashSet<String>> watch2Paths =
new HashMap<Watcher, HashSet<String>>();
}

WatcherManager类用于管理watchers和相应的触发器。watchTable表示从节点路径到watcher集合的映射,而watch2Paths则表示从watcher到所有节点路径集合的映射

注册:客户端调用getData之后,服务器收到请求,就会将数据节点的路径以及 ServerCnxn(远程通信信息) 信息存储到 WatchManager 的 watchTable 和 watch2Paths 中。而同时客服端收到返回之后,也会在本地注册watcher到一个watchmanager

触发:客户端调用setData之后,服务器收到请求,修改完数据之后调用triggerWatch方法,服务器从之前的 watchManager 中获得 watchers,然后一个个调用 process 方法发送通知到客户端。客户端收到请求后转为watchedEvent进入waitingEvent队列,最终取出并执行processevent

权限管理(ACL)

创建每个znode都会产生一个acl列表,列表中每个ACL 包括:

  1. 权限perms

    1. Create 允许对子节点Create 操作
    2. Read 允许对本节点GetChildren 和GetData 操作
    3. Write 允许对本节点SetData 操作
    4. Delete 允许对子节点Delete 操作
    5. Admin 允许对本节点setAcl 操作
  2. 验证模式scheme

    1. Digest:Client 端由用户名和密码验证,譬如user:pwd
    2. Digest: Client 端由主机名验证,譬如localhost
    3. Ip:Client 端由IP 地址验证,譬如172.2.0.0/24
    4. World :固定用户为anyone,为所有Client 端开放权限
  3. 具体内容expression:Ids

Server 收到Client 发送的操作请求(除exists、getAcl 之外),需要进行ACL 验证:对该请求携带的Author 明文信息加密,并与目标节点的ACL 信息进行比较,如果匹配则具有相应的权限,否则请求被Server 拒绝。

会话

一旦客户端与一台ZooKeeper服务器建立连接,这台服务器就会为该客户端创建一个新的会话。正常来说会话是长期的,保证客户端与zk服务器的正常连接,直到客户端主动断开连接,但是有两个意外情况:

  1. 会话超时,每个会话仍旧有一个超时时间,如果服务器在超时时间段内没有收到任何请求,则相应的会话会过期。一旦一个会话已经过期,就无法重新打开,并且任何与该会话相关联的短暂znode都会丢失。

    解决方法:只要一个会话空闲超过一定时间,都可以通过客户端发送ping请求(也称为心跳)保持会话不过期。ping请求由ZooKeeper的客户端库自动发送,所以我们使用的时候不用考虑会话过期问题,同时ping也可以用来检测连接的服务器是否有问题

  2. 服务器故障:上面说到的,ping的时候可以监测服务故障(所以ping的时间间隔不应该太长)

    ZooKeeper客户端可以自动地进行故障切换,切换至另一台ZooKeeper服务器。并且关键的一点是,在另一台服务器接替故障服务器之后,所有的会话和相关的短暂Znode仍然是有效的。

    但是另一台服务器接替故障服务器之后,之前的watcher和重连过程中znode发生的变化都会丢失,需要客户端重新设置watcher

集群

分布式系统的单点故障:主节点挂了

正常来说,如果master节点挂了,会重新选举一个新master节点,怎么判断主节点挂了呢?

– 传统方式是采用一个备用节点,这个备用节点定期给当前主节点发送ping包,主节点收到ping包以后向备用节点发送回复Ack ,当备用节点收到回复的时候就会认为当前主节点还活着,让他继续提供服务。

但是如果备用节点到主节点的网络有问题会怎么样?

– 备用节点会自动升级成主节点,那从节点就乱了,一部分汇报给了主节点,一部分汇报给了备用节点

最开始提到的,hdfs利用zk来感知节点故障并且进行选举,这个逻辑其实是把“故障转移到了ZooKeeper身上”

那么zk本身就需要保证自己不会出现上面描述的故障

先说一下zk是怎么做的:在master节点下注册比如两个节点,因为zk节点单调递增的属性,就可以用id最小的节点作为当前master节点,同时对其他节点都注册一个对比自己小一的节点的exist watcher。如果当前master节点挂了那么master节点会被删除,那么比master大一的节点会收到事件,并且自动启动master脚本称为master节点

但是如果zk自己只有一个机器,并且zk这个机器挂了呢?

– 那这个分布式系统会自动变成双master系统

所以zk需要首先有可用性和恢复性的保证

ZooKeeper运行模式

“独立模式“(standalone mode),即只有一个ZooKeeper服务器。这种模式较为简单,比较适合于测试环境,甚至可以在单元测试中采用,但是不能保证高可用性和恢复性。

“复制模式“(replicated mode),ZooKeeper运行于一个计算机集群上,这个计算机集群被称为一个”集合体“(ensemble)。

ZooKeeper集群

zk集群

ZooKeeper通过复制来实现高可用性,只要集合体中半数以上的机器处于可用状态,它就能够提供服务。例如,在一个有5个节点的集合体中,每个Follower节点的数据都是Leader节点数据的副本,也就是说我们的每个节点的数据视图都是一样的,这样就可以有五个节点提供ZooKeeper服务。并且集合体中任意2台机器出现故障,都可以保证服务继续,因为剩下的3台机器超过了半数。

ZK集群中每个Server,都保存一份数据副本。Zookeeper使用简单的同步策略,通过以下两条基本保证来实现数据的一致性:

① 全局串行化所有的写操作

​ 读请求,由每台Server数据库的本地副本来进行服务。

​ 写请求,需要通过一致性协议来处理:所有写请求都要被转发到一个单独的Server,称作Leader。ZK集群中其他Server 称作Follower,负责接收Leader发来的提议消息,并且对消息转发达成一致。

② 保证同一客户端的指令被FIFO执行(以及消息通知的FIFO)

集群可用性

为什么是半数以上机器呢?

其实所谓的集群是否可用,隐含的意思就是集群是否能选举出一个leader

一般情况下zk的选举用的是 majority quorums的方案:只有当超过一半的follower达成一致的时候,才会选举出leader->所以建议zk的节点是奇数

那么我们再回头看一下前面说的单点故障下多个master的问题,我们也可以称为脑裂问题,就是因为通信等问题导致一部分节点失去leader的消息,重新选举一个leader之后,网络恢复出现两个leader的问题。那么在zk的场合下我们假设遇到了这个问题,机房A有三台机器,机房B有两台,当机房A和B断连之后:

首先leader和follower之间是有心跳检测的,所以follower会很快判断自己和leader断连了并且进入选举

  1. leader是机房A的,那么机房B选不出leader,就会变成不可用的follower,直到网络恢复
  2. leader是机房B的,那么断开之后机房A会选出一个新leader,可以看一下最后的epoch位介绍,每次选一个新leader epoch位都会加一,那么当网络恢复之后,机房A和B的leader在通信的时候会判断epoch是否一致,epoch小的那个会自动重置

所以保证了zk集群不会出现脑裂问题,但是在场景2当网络恢复之前也会出现两个leader,所以还需要增加其他方案来保证不会出现脑裂问题

zk与cap

① 顺序一致性

来自任意特定客户端的更新都会按其发送顺序被提交。也就是说,如果一个客户端将Znode z的值更新为a,在之后的操作中,它又将z的值更新为b,则没有客户端能够在看到z的值是b之后再看到值a(如果没有其他对z的更新)。

② 原子性

每个更新要么成功,要么失败。这意味着如果一个更新失败,则不会有客户端会看到这个更新的结果。

③ 单一系统映像

一 个客户端无论连接到哪一台服务器,它看到的都是同样的系统视图。这意味着,如果一个客户端在同一个会话中连接到一台新的服务器,它所看到的系统状态不会比 在之前服务器上所看到的更老。当一台服务器出现故障,导致它的一个客户端需要尝试连接集合体中其他的服务器时,所有滞后于故障服务器的服务器都不会接受该 连接请求,除非这些服务器赶上故障服务器。

④ 持久性

一个更新一旦成功,其结果就会持久存在并且不会被撤销。这表明更新不会受到服务器故障的影响。

zab协议

ZAB原名叫做Zookeeper Atomic Broadcast,是一种支持崩溃恢复的原子广播协议。

背景:所有的读都是在follower中处理的,但是写操作都会被转发到leader手上由leader统一处理

消息广播

所有的事务必须由一个全局唯一的服务器协调处理,这个服务器叫做Leader,其他的服务器称为Follower。Leader服务器将事务转换成Proposal(提议),并将该提议分发给所有的Follower。Leader等待Follower反馈,一旦超过半数正确反馈后,Ledaer将会再次向所有的Follower服务器发送Commit消息,要求将前一个Proposal提交。

在广播过程中,Leader服务器会首先为这个事务分配一个全局单调递增的唯一ID,我们称之为事务ID(即ZXID)。Leader服务器会为每一个Follower服务器给自分配一个单独队列FIFO。将要广播的事务放到队列中,然后进行消息的发送。每一个Follower接受到事务后,会将其以事务日志的形式写道磁盘,成功后返回ACK消息,当Leader接收到半数以上的ACK后,就会广播一个Commit消息,通知Follower进行事务提交。

正常的数据更新就是用消息广播来处理的,但是如果leader结点崩溃的话,消息广播并不能解决问题

崩溃恢复

当Leader服务与过半的Follower失联,那么就会进入崩溃恢复模式。为了保证集群的正常运行,需要选举出新的Leader,这种叫做崩溃恢复,它不仅要让Leader知道自己是Leader节点,还要让其他的Follower知道他是Leader。

当崩溃恢复选取节点的时候会首先比较事务id(zxid),举个例子,当Leader发送Commit给其他节点的时候,假如有三个节点A、B、C,其中A收到了,当发送B的时候Ledaer突然崩了,那么新选举出来的节点应该是A,因为A的事务id最大。

恢复原则:

① 我们绝不能遗忘已经被deliver的消息,若一条消息在一台机器上被deliver,那么该消息必须将在每台机器上deliver。

方式:其实主要是借助leader选取的是最高zxid这个方案,当 Follower 连接上 Leader 之后,Leader 服务器会根据自己服务器上最后被提交的 ZXID 和 Follower 上的 ZXID 进行比对,比对结果要么回滚,要么和 Leader 同步。

② 我们必须丢弃已经被skip的消息。

方式:在我们的实现中,Zxid是由64位数字组成的,低32位用作简单计数器。高32位是一个epoch。每当新Leader接管它时,将获取日志中Zxid最大的epoch,新Leader Zxid的epoch位设置为epoch+1,counter位设置0。用epoch来标记领导关系的改变,并要求Quorum Servers 通过epoch来识别该leader,避免了多个Leader用同一个Zxid发布不同的提议。

paxos和zab对比:https://www.cnblogs.com/wuxl360/p/5817646.html

伸缩性

增加observer:简单来说,observer和follower一样作为读服务器,承载读请求,同时把写请求转发给leader

区别:在写请求的时候leader会发起proposal并收集投票,observer不参与接受propose和返回ack的过程,只接收最后的commit结果并同步本地数据

目的:

  1. 增加可伸缩性

我们现在可以加入很多 Observer 节点,而无须担心严重影响写吞吐量。虽然通知commit的阶段的开销还是会随着observer节点增长而增加,但是这里的开销相对来说非常低

  1. 增加广域网覆盖能力

可以把leader和follower这种需要大量通信交互的放在尽量近的网络内,保证往返时延不会太高,同时把observer放在需要访问 ZooKeeper 的任意数据中心中。这样,投票协议不会受到数据中心间链路的高时延的影响,性能得到提升。投票过程中 Observer 和领导节点间的消息远少于投票服务器和领导节点间的消息。这有助于在远程数据中心高写负载的情况下降低带宽需求。

刷题碎碎念二:原地交换

Posted on 2023-12-13 | In leetcode

正常交换的时候我们需要用一个临时存储tmp: tmp = a; a = b; b = tmp

怎么原地存储呢?很简单的想法就是用一个差: a = a - b; b = a + b; a = -( a - b )

如果用位运算呢

所以就变成了

a = a ^ b

b = a ^ b

a = b ^ a

利用xor的思路想就很简单很简单

但是本质就是个智力游戏,没有实际用途。。。还不如用一个tmp来的有可读性

找工作TBD

Posted on 2023-12-13 | Edited on 2024-01-24
  1. 看im逻辑(重要非常重要
  2. 去隔壁分析云看一眼新系统
  3. csapp
    1. SRAM?
    2. 补码(看过很多次了再看一次)// DONE
    3. 浮点数计算(不重要但是不影响好奇)
    4. 多路复用,I/O多路复用
    5. 处理器pipeline设计
  4. mysql,mongo和clickhouse
  5. 蹭公司的游戏开发框架内部课程看两眼(你司的招牌就是im和游戏了临走总得学一学)
  6. 不能因为是通信科班出来的就不复习包括telnet在内的各种计算机网络谢谢
  7. 统一配置相关
  8. go prof:现在应该勉强能看懂一些了,可以试着写一个案例来优化
  9. 边缘节点
  10. go micro
  11. mq
  12. 规则引擎
  13. flink spark etl

  14. 网络 – 去公司前看掉

  15. trpc-go 和微服务架构
  16. mongo,es和ck
  17. etl和规则引擎
  18. 过一下项目
  19. 分布式和分布式事务
  20. spark和hdfs
  21. syncmap
  22. rabbit和kafka区别

剑指offer memo

Posted on 2023-12-08 | Edited on 2024-01-24 | In leetcode

数组中重复数字

题目:https://leetcode.com/problems/find-the-duplicate-number/submissions/1114957310/

解法:

  1. hashmap,最容易想到的解法,但是空间复杂度O(n)
  2. 能改变nums数组的背景下将值对应index的值变为-,重复遇到时值为负,但是会改变数组值

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    func findDuplicate(nums []int) int {
    for _,num := range nums {
    key := int(math.Abs(float64(num)))
    if nums[key] < 0 {
    return key
    }
    nums[key] = -nums[key]
    }
    return 0
    }
  3. 二分法:在n个 1-n的数字中有重复,这个问题可以理解为,在数字 1到n/2 和 n/2到n两个范围中,重复数字落下的区间所拥有的数组数字量一定比一半要更多,用这个方式可以对 1-n的数字二分最后找到答案,时间复杂度O(nlogn)

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    func findDuplicate(nums []int) int {
    left := 1
    right := len(nums)
    for {
    if(left >= right) {
    return left
    }
    mid := (left+right) / 2
    var leftNum,rightNum int
    for _,num := range nums {
    if num > mid && num <= right {
    rightNum++
    } else if num >= left && num <= mid {
    leftNum++
    }
    }

    if leftNum <= mid - left + 1 {
    left = mid + 1
    } else {
    right = mid
    }
    }
    return 0
    }
  4. 环形链表:数字和对应下标值可以组成一个链表(当然是在数字本身是1-n的背景下),比如数组 [1,3,2,2] 组成的链表就是 1->3->2->2…,最后的2就会变成一个环形链表,可以用快慢指针找环形链表的方案求解,时间复杂度O(n),空间复杂度O(1) 完美

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    func findDuplicate(nums []int) int {
    var fast,slow,start int
    for {

    slow = nums[slow]
    fast = nums[nums[fast]]
    if fast == slow {
    break
    }
    }
    for {
    if start == slow {
    break
    }
    slow = nums[slow]
    start = nums[start]
    }
    return start
    }

二维数组中的查找

题目:https://leetcode.com/problems/search-a-2d-matrix-ii/submissions/1116268925/

没啥老朋友+1,就是从左到右从上到下递增的二维数组怎么找数字

从右上和左下开始的思路是一样的,以从右上开始打个比方,就是

  • 如果matrix[a][b] > target,那么对所有x,y, x > a && y > b,都比target要大,也就是整个右下角,所以可以跳过右下角的遍历
  • 如果matrix[a][b] < target,那么对所有x,y, x < a && y < b,都比target要小,所以不需要关心左上角
  • 从右上开始的逻辑就是,右上角已经走过了,之后只需要对左下角接着遍历就ok了
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
func searchMatrix(matrix [][]int, target int) bool {
if len(matrix) == 0 {
return false
}
m := len(matrix)
x := 0
y := len(matrix[0])-1
for {
if x == m || y < 0 {
break
}
if matrix[x][y] == target {
return true
}

if matrix[x][y] > target {
y --
} else {
x ++
}
}
return false
}

重建二叉树

题目:https://leetcode.com/problems/construct-binary-tree-from-preorder-and-inorder-traversal/

老朋友加一,同时知道前序遍历(或后序遍历)和中序遍历就能够唯一确定一颗二叉树,而前序和后序则不能

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
func buildTree(preorder []int, inorder []int) *TreeNode {
if len(preorder) == 0 {
return nil
}
cur := &TreeNode{
Val: preorder[0],
}
var inIndex int
for i,in := range inorder {
if in == preorder[0] {
inIndex = i
}
}
cur.Left = buildTree(preorder[1:inIndex+1], inorder[0:inIndex])
cur.Right = buildTree(preorder[inIndex+1:], inorder[inIndex+1:])
return cur
}

用两个栈实现堆

题目:https://leetcode.com/problems/implement-queue-using-stacks/submissions/1116341974/

没什么好讲的+1,写着玩

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
type MyQueue struct {
In []int
Out []int
}


func Constructor() MyQueue {
return MyQueue{}
}


func (this *MyQueue) Push(x int) {
this.In = append(this.In, x)
}


func (this *MyQueue) Pop() int {
var ret int
if len(this.Out) == 0 {
this.Out = append(this.Out, this.In...)
this.In = []int{}
}
ret = this.Out[0]
this.Out = this.Out[1:]
return ret
}


func (this *MyQueue) Peek() int {
if len(this.Out) == 0 {
this.Out = append(this.Out, this.In...)
this.In = []int{}
}
return this.Out[0]
}


func (this *MyQueue) Empty() bool {
return len(this.Out) == 0 && len(this.In) == 0
}


/**
* Your MyQueue object will be instantiated and called as such:
* obj := Constructor();
* obj.Push(x);
* param_2 := obj.Pop();
* param_3 := obj.Peek();
* param_4 := obj.Empty();
*/

斐波那契数列

题目:

  • https://leetcode.com/problems/fibonacci-number/
  • https://leetcode.com/problems/climbing-stairs/description/
1
2
3
4
5
6
func fib(n int) int {
if n == 0 || n == 1 {
return n
}
return fib(n-1) + fib(n-2)
}

多写两步就是爬楼梯,爬楼梯的本质方法其实就是下面:

1
2
3
4
5
6
func climbStairs(n int) int {
if n == 1 || n == 2 {
return n
}
return climbStairs(n-1) + climbStairs(n-2)
}

但是显而易见这是个递归,时间复杂度O(2^n),这么简单的问题用递归太消耗时间了(虽然确实很简单),climbStairs(n-1) 和 climbStairs(n-2) 一定遇见过很多次重复计算,如果我们把爬到每层楼梯的计算结果都存下来呢?

下面就是用空间换时间之后的结果:

1
2
3
4
5
6
7
8
9
func climbStairs(n int) int {
steps := make([]int, n)
steps[0] = 1
steps[1] = 2
for i := 2; i<n; i++ {
steps[i] = steps[i-2] + steps[i - 1]
}
return steps[n-1]
}

瞬间就变成了时间O(n),空间O(n)的东西了

最后我们发现其实每次只关心step[n-1]和step[n-2],所以就可以简化空间:

1
2
3
4
5
6
7
8
9
10
func climbStairs(n int) int {
one := 1
res := 1
for i := 1; i<n; i++ {
two := res
res += one
one = two
}
return res
}

时间复杂度O(n),空间复杂度O(1)

最后总结一下,dp本质上就是用空间换时间,主要的流程分为以下三步:

  1. 写出递归公式 f(n) = f(n-k)+f(n-w) (简单打个比方)
  2. 用空间换时间方法写出dp解法
  3. 优化空间

旋转数组的最小数字

题目:https://leetcode.com/problems/find-minimum-in-rotated-sorted-array/

解法就是显而易见的二分法,写二分法的时候我为了看起来舒服所以个人习惯喜欢写递归,这次把迭代递归两种都写一下:
递归:

1
2
3
4
5
6
7
8
9
10
11
func findMin(nums []int) int {
if len(nums) == 1 {
return nums[0]
}
mid := (0 + len(nums) - 1 ) / 2
if nums[len(nums) - 1] < nums[mid] {
return findMin(nums[mid+1:])
} else {
return findMin(nums[0:mid+1])
}
}

迭代:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
func findMin(nums []int) int {
left := 0
right := len(nums) - 1
for {
if left >= right {
return nums[left]
}
mid := (left + right) / 2
if nums[right] < nums[mid] {
left = mid + 1
} else {
right = mid
}
}
return 0
}

矩阵中的路径

题目:https://leetcode.com/problems/word-search/solutions/2498848/simple-backtracking-with-go/

典型回溯法求解,一开始天真地写了个外部函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
func exist(board [][]byte, word string) bool {
for i,row := range board {
for j,_ := range row {
res := find(board,word,0,i,j)
if res == true {
return res
}

}
}
return false
}

func find(board [][]byte, word string, wordIndex,x,y int) bool{
m := len(board)
n := len(board[0])
if x < 0 || x >= m || y < 0 || y >= n {
return false
}
if board[x][y] != []byte(word)[wordIndex] {
return false
}
if wordIndex == len(word) - 1 {
return true
}


tmp := board[x][y]
board[x][y] = byte(0)

wordIndex++
res := false
res = find(board,word,wordIndex,x,y + 1) || find(board,word,wordIndex,x,y - 1) || find(board,word,wordIndex,x-1,y) || find(board,word,wordIndex,x+1,y)

board[x][y] = tmp
return res
}

然后发现go可以搞闭包:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
func exist(board [][]byte, word string) bool {
m := len(board)
n := len(board[0])
var backtrack func(x,y,wordIndex int) bool
backtrack = func(x,y,wordIndex int) bool{
if x < 0 || x >= m || y < 0 || y >= n {
return false
}
if board[x][y] != []byte(word)[wordIndex] {
return false
}
if wordIndex == len(word) - 1 {
return true
}


tmp := board[x][y]
board[x][y] = byte(0)

wordIndex++
res := false
res = backtrack(x,y + 1, wordIndex) || backtrack(x,y - 1, wordIndex) || backtrack(x-1,y, wordIndex) || backtrack(x+1,y,wordIndex)

board[x][y] = tmp
return res
}
for i,row := range board {
for j,_ := range row {
if backtrack(i,j,0) {
return true
}

}
}
return false
}

二进制中1的个数

题目: https://leetcode.com/problems/number-of-1-bits/solutions/4340903/o-log-n-c-python-java-explained/

解法:
简单遍历取余,时间复杂度 O(logn)

1
2
3
4
5
6
7
8
9
10
11
12
13
func hammingWeight(num uint32) int {
var time int
for {
if num == 0 {
break
}
if num %2 == 1 {
time++
}
num /= 2
}
return time
}

可以写成位运算的形式(从计算效率来说位运算肯定比除法好很多)(为什么?):

1
2
3
4
5
6
7
8
9
10
11
func hammingWeight(num uint32) int {
var time int
for {
if num == 0 {
break
}
time+= int(num%2)
num >>= 1
}
return time
}

数值的整数次方

题目:https://leetcode.com/problems/powx-n/

解法:

  1. 顺着乘起来,时间复杂度是O(n)
  2. 假设二进制 n = 1001, $a^n$ = $a^8*a$,可以把时间复杂度降低到 $log_2n$
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
func myPow(x float64, n int) float64 {
var powVal float64
powVal = 1
newN := int(math.Abs(float64(n)))
for {
if newN==0 {
break
}
if newN & 1 != 0 {
powVal *= x
}
x *= x
newN = newN >> 1
}
if n >= 0 {
return powVal
} else {
return 1/powVal
}
}

在O(1)时间内删除链表节点

题目:https://leetcode.com/problems/delete-node-in-a-linked-list/submissions/1117061591/

简单来说考验的就是对链表的熟悉程度

  1. 一开始想到的方案是把链表整个往左移,所以写了个循环
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
func deleteNode(node *ListNode) {
for {
fmt.Printf("node:%v",node.Next)
if node.Next != nil {
node.Val = node.Next.Val
if node.Next.Next == nil {
node.Next = nil
break
}
node = node.Next

} else {
node = nil
break
}

}
}
  1. 后来发现自己就是个弱智,直接把下一个节点跳过不就行了?题目还强调了那么多次保证不会给末端节点,不审题唉
    1
    2
    3
    4
    func deleteNode(node *ListNode) {
    node.Val = node.Next.Val
    node.Next = node.Next.Next
    }

删除链表中重复的节点

题目:https://leetcode.com/problems/remove-duplicates-from-sorted-list-ii/

自己写的时候多绕了一层,判断是否重复,然后再挨个判断

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
func deleteDuplicates(head *ListNode) *ListNode {
newHead := &ListNode{
Val: -101,
Next : head,
}
cur := newHead
before := -101
for {
if cur == nil {
break
}
if cur.Next == nil {
break
}
if cur.Next.Val == before {
cur.Next = cur.Next.Next
} else if cur.Next.Next != nil && cur.Next.Val == cur.Next.Next.Val {
before = cur.Next.Val
cur.Next = cur.Next.Next.Next

} else {
cur = cur.Next
}
}
return newHead.Next
}

也可以把每个节点内加个判断重复的循环就行了,性能差不多但是代码可读性和理解会更简单一点

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
func deleteDuplicates(head *ListNode) *ListNode {
newHead := &ListNode{
Val: -101,
Next : head,
}
cur := head
before := newHead
for {
if cur == nil {
break
}

var count int

for {
if cur.Next == nil ||cur.Next.Val != cur.Val {
break
}
cur = cur.Next
count++
}
if count == 0 {
before.Next = cur
before = cur
}
cur = cur.Next
}
before.Next = nil
return newHead.Next
}

正则表达式

题目:https://leetcode.com/problems/regular-expression-matching/

经典dp题,做了好一阵子

首先写出dp公式,一开始的时候写出来是这样的:

  • 假设 i:字符串index,j:正则index
  • 当p[j] 不等于 ‘*’ 时,dp[i][j] = dp[i-1][j-1] && (s[i] = p[j] || p[j] = ‘.’)
  • 当p[j] 等于 ‘*’ 时,dp[i][j] = (s[i] = p[j-1] || p[j-1] = ‘.’) && (dp[i-1][j-1] || dp[i-1][j])

美滋滋写完发现报错了,忽略了一种场景:input= “aab”, regx = “c*a*b”,也就是”c*“不是”c+”,没有字符也是ok的

所以在这个的基础上针对输入加了第0位,也就是空字符串的正则表达匹配,最后的公式等于:

  • 假设 i:字符串index,j:正则index
  • 当p[j] 不等于 ‘*’ 时,dp[i][j] = dp[i-1][j-1] && (s[i] = p[j] || p[j] = ‘.’)
  • 当p[j] 等于 ‘*’ 时,dp[i][j] = (s[i] = p[j-1] || p[j-1] = ‘.’) && (dp[i-1][j-1] || dp[i-1][j]) || dp[i][j-2]
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
func isMatch(s string, p string) bool {
var dps [][]bool
m := len(s)
n := len(p)
dps = make([][]bool, m+1)
for d:= 0; d< m+1; d++ {
dps[d] = make([]bool, n+1)
}

dps[0][0] = true
for i:=0; i<=m; i++ {
for j := 1;j<=n; j++ {
if p[j-1] != '*' {
dps[i][j] = i > 0 && dps[i-1][j-1] && (s[i-1] == p[j-1] || p[j-1] == '.')
} else {
dps[i][j] = (i > 0 && (s[i-1] == p[j-2] || p[j-2] == '.') && (dps[i-1][j] || dps[i-1][j-1])) || dps[i][j-2]
}
}
}
return dps[m][n]
}

合法数值字符串

题目:https://leetcode.com/problems/valid-number/

解法:

  1. 本能用了有限状态自动机来做:

    • 状态0:字符串开始和结束
    • 状态1:符号
    • 状态2:数字
    • 状态3:e和E
    • 状态4:前面没有出现过数字的.
    • 状态5:前面出现过数字的.
      然后构建了一个状态转移矩阵,不过因为其实需要处理的特殊场景比较多(比如E和.只能出现一次),所以还是出现了不少if else判断,其实也可以把这些if else都转成其他状态,但是懒了这样也挺好
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      34
      35
      36
      37
      38
      39
      40
      41
      42
      43
      44
      45
      46
      47
      func isNumber(s string) bool {
      trans :=[6][6]int{
      {0,1,1,0,1,0},
      {0,0,1,0,1,0},
      {1,0,1,1,1,1},
      {0,1,1,0,0,0},
      {0,0,1,0,0,0},
      {1,0,1,1,0,0},
      }
      var hasE,hasPoint,hasVal bool
      var preStatus int
      for _,val := range s{
      var status = 0
      if val == '-'|| val == '+'{
      status = 1
      } else if val >= '0' && val <= '9' {
      status = 2
      hasVal = true
      } else if val == '.' {
      if hasPoint {
      return false
      }
      if hasVal {
      status = 5
      } else {
      status = 4
      }
      hasPoint = true

      } else if val == 'E' || val == 'e' {
      status = 3
      if hasE {
      return false
      }
      hasE = true
      hasPoint = true
      } else {
      return false
      }
      if trans[preStatus][status] == 0 {
      return false
      }
      preStatus = status
      }
      status := 0
      return trans[preStatus][status] == 1
      }
  2. 正则,当然是个很不错的解法 const regex Solution::pattern("[+-]?(?:\\d+\\.?\\d*|\\.\\d+)(?:[Ee][+-]?\\d+)?") 但是y1s1对正则真的不是特别熟练。。。

按奇偶排序数组

题目:https://leetcode.com/problems/sort-array-by-parity/

非常典型的双指针题,遍历都是一遍过没有问题
解法:

  1. 分配一个新的数组存,空间复杂度会变成O(n)

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    func sortArrayByParity(nums []int) []int {
    start := 0
    end := len(nums) - 1
    res := make([]int, end+1)
    for _,num:= range nums {
    if num%2 == 0 {
    res[start] = num
    start++
    } else {
    res[end] = num
    end--
    }
    }
    return res
    }
  2. 原地交换,空间复杂度O(1)

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    func sortArrayByParity(nums []int) []int {
    start := 0
    end := len(nums) - 1
    for {

    for {
    if nums[start]%2 == && start < end{
    start++
    } else {
    break
    }
    }
    for {
    if nums[end]%2 == 1 && end > start{
    end--
    } else {
    break
    }
    }
    if start >= end {
    break
    } else {
    tmp := nums[start]
    nums[start] = nums[end]
    nums[end] = tmp
    start++
    end--
    }
    }
    return nums
    }

链表中倒数第n个节点

题目:https://leetcode.com/problems/remove-nth-node-from-end-of-list/

没啥,前后指针

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
func removeNthFromEnd(head *ListNode, n int) *ListNode {
newHead := &ListNode{
Next: head,
}
start := newHead
end := newHead
for i:=0; i<n; i++ {
start = start.Next
}
for {
if start == nil {
break
}
if start.Next != nil {
start = start.Next
end = end.Next
} else {
break
}
}
end.Next = end.Next.Next
return newHead.Next
}

链表环入口

题目:https://leetcode.com/problems/linked-list-cycle-ii/
解法见第一题

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
/**
* Definition for singly-linked list.
* type ListNode struct {
* Val int
* Next *ListNode
* }
*/
func detectCycle(head *ListNode) *ListNode {
fast := head
slow := head
var step int
for {
if fast == nil || slow == nil || fast.Next == nil || slow == nil || fast.Next.Next == nil {
return nil
}
if step == 0 || fast != slow {
fast = fast.Next.Next
slow = slow.Next
step++
} else {
break
}
}
// 肯定有环,找入口就行
start := head
for {
if start != slow {
start = start.Next
slow = slow.Next
} else {
break
}
}
return start
}

反转链表

题目:https://leetcode.com/problems/reverse-linked-list/solutions/3211778/using-2-methods-iterative-recursive-beats-97-91/

老朋友了+1,没什么好说的,写了两种解法:

  1. 循环迭代,时间复杂度O(n),空间复杂度 O(1)

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    func reverseList(head *ListNode) *ListNode {
    newHead := &ListNode{
    Next:nil,
    }
    for {
    if head != nil {
    old := newHead.Next
    newHead.Next = head
    head = head.Next
    newHead.Next.Next = old
    } else {
    break
    }
    }
    return newHead.Next
    }
  2. 递归:时间复杂度O(n),空间复杂度 O(n)(函数调用空间)

    1
    2
    3
    4
    5
    6
    7
    8
    9
    func reverseList(head *ListNode) *ListNode {
    if head == nil || head.Next == nil {
    return head
    }
    newHead := reverseList(head.Next)
    head.Next.Next = head
    head.Next = nil
    return newHead
    }

合并两个排序链表

题目:https://leetcode.com/problems/merge-two-sorted-lists/

不优雅但好理解,好理解最重要~

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
func mergeTwoLists(list1 *ListNode, list2 *ListNode) *ListNode {
newHead := &ListNode{
}
head := newHead
for {
if list1 == nil && list2 == nil {
break
}
if list1.Val < list2.Val {
head.Next = list1
list1 = list1.Next
} else {
head.Next = list2
list2 = list2.Next
}
head = head.Next
}
var left *ListNode
if list1 != nil {
left = list1
} else {
left = list2
}
for {
if left != nil {
head.Next = left
head = head.Next
left = left.Next
} else {
break
}
}
return newHead.Next
}

判断是否子树

题目:https://leetcode.com/problems/subtree-of-another-tree/

题解:

我自己写的时候思路是把判断是子树往下走的逻辑和找子树的逻辑混在一块写了,用isSub来判断是在判断子树部分是否一致还是在找子树,这样不是很好看懂还有一堆判断逻辑

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
func isSubtree(root *TreeNode, subRoot *TreeNode) bool {

return findSub(root,subRoot,false)
}

func findSub (root *TreeNode, subRoot *TreeNode, isSub bool) bool {
if root == nil && subRoot == nil {
return true
}
if root == nil || subRoot == nil {
return false
}
var res bool
if root.Val == subRoot.Val {
res = findSub(root.Left,subRoot.Left, true) && findSub(root.Right,subRoot.Right, true)
} else if isSub {
return false

}
if !isSub {
res = res || findSub(root.Left,subRoot, false) || findSub(root.Right,subRoot, false)
}
return res
}

把判断是否一致的逻辑单独拆出来:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
func isSubtree(root *TreeNode, subRoot *TreeNode) bool {
if root == nil || subRoot == nil {
return root == subRoot
}
return isIdentical(root,subRoot) || isSubtree(root.Left, subRoot) || isSubtree(root.Right, subRoot)
}

func isIdentical (root *TreeNode, subRoot *TreeNode) bool {
if root == nil || subRoot == nil {
return root == subRoot
}
var res bool
if root.Val == subRoot.Val {
res = isIdentical(root.Left,subRoot.Left) && isIdentical(root.Right,subRoot.Right)
} else {
return false
}
return res
}

翻转二叉树

题目:https://leetcode.com/problems/invert-binary-tree/description/

解法:

  1. dfs递归,说实话递归没啥好写的,就是个普通的递归就结束了(所以说人类爱写递归不是没有原因的,太好写也太好懂了虽然占地方)

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    func invertTree(root *TreeNode) *TreeNode {
    if root == nil {
    return nil
    }
    var right,left *TreeNode
    right = invertTree(root.Left)
    left = invertTree(root.Right)
    root.Left = left
    root.Right = right
    return root
    }
  2. bfs,用队列来遍历二叉树,以后经常会这么做,习惯就好

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    func invertTree(root *TreeNode) *TreeNode {
    if root == nil {
    return nil
    }
    var stack []*TreeNode
    stack = append(stack, root)
    for {
    l := len(stack)
    if l == 0 {
    break
    }
    head := stack[l - 1]
    stack = stack[:l-1]
    tmp := head.Right
    head.Right = head.Left
    head.Left = tmp
    if head.Right != nil {
    stack = append(stack, head.Right)
    }
    if head.Left != nil {
    stack = append(stack, head.Left)
    }
    }
    return root
    }

对称的二叉树

题目:https://leetcode.com/problems/symmetric-tree/

当然肯定有用堆来做的解法,一样一样的,偷懒不想写了

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
func isSymmetric(root *TreeNode) bool {
if root == nil {
return true
}
return symmetric(root.Left, root.Right)
}

func symmetric(left *TreeNode, right *TreeNode) bool {
if left == nil || right == nil {
return left == right
}
if left.Val == right.Val {
return symmetric(left.Left, right.Right) && symmetric(left.Right, right.Left)
} else {
return false
}
}

顺时针打印矩阵

题目:https://leetcode.com/problems/spiral-matrix/

题解:设置上下左右边界,按照边界进行遍历就行,时间复杂度O(mn),空间复杂度O(1),因为只需要记录四个边界

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
func spiralOrder(matrix [][]int) []int {
if len(matrix) == 0 {
return []int{}
}
var left,right,up,down int
var res []int
right = len(matrix[0]) - 1
down = len(matrix) - 1
for {
if left > right || up > down {
break
}

for i:= left; i <= right; i++ {
res = append(res, matrix[up][i])
}
up++
for i:= up; i<= down; i++ {
res = append(res, matrix[i][right])

}
right--
if up <= down {
for i:= right; i >= left; i-- {
res = append(res, matrix[down][i])

}
down--
}
if left <= right {
for i:= down; i>= up; i-- {
res = append(res, matrix[i][left])

}
left++
}
}
return res
}

min stack

题目:https://leetcode.com/problems/min-stack/submissions/1117973116/

解法:

  1. 辅助栈。时间复杂度O(1),空间复杂度O(n),简单来说就是搞个辅助栈minStack,利用栈先进后出的特性保存当前栈的最小值(一开始把栈和堆搞反了怎么也做不出来)

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    type MinStack struct {
    stack []int
    minStack []int
    }


    func Constructor() MinStack {
    return MinStack{}
    }


    func (this *MinStack) Push(val int) {
    this.stack = append(this.stack,val)

    if len(this.minStack) > 0 && this.minStack[len(this.minStack) - 1] < val {
    this.minStack = append(this.minStack, this.minStack[len(this.minStack) - 1])
    } else {
    this.minStack = append(this.minStack, val)
    }
    }


    func (this *MinStack) Pop() {
    this.stack = this.stack[:len(this.stack)-1]
    this.minStack = this.minStack[0:len(this.minStack)-1]
    }


    func (this *MinStack) Top() int {
    return this.stack[len(this.stack)-1]
    }


    func (this *MinStack) GetMin() int {
    return this.minStack[len(this.minStack)-1]
    }
  2. 空间复杂度为O(1)的辅助栈:受到了一点题解的启发,用和最小值的差值而不是当前值来保存,不过说实话这么难以阅读的逻辑不要也罢

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    type MinStack struct {
    stack []int
    min int
    }


    func Constructor() MinStack {
    return MinStack{
    min:-1,
    }
    }


    func (this *MinStack) Push(val int) {
    if len(this.stack) == 0 {
    this.min = val
    }
    diff := val - this.min
    this.stack = append(this.stack,diff)

    if diff < 0{
    this.min = val
    }
    }


    func (this *MinStack) Pop() {
    val := this.stack[len(this.stack)-1]
    this.stack = this.stack[:len(this.stack)-1]
    if val < 0 {
    this.min -= val
    }

    }


    func (this *MinStack) Top() int {
    val := this.stack[len(this.stack)-1]
    if val <= 0 {
    return this.min
    } else {
    return val + this.min
    }
    }


    func (this *MinStack) GetMin() int {
    return this.min
    }

判断栈压入弹出序列合法

题目:https://leetcode.com/problems/validate-stack-sequences/

题解:其实就是模拟栈来处理

  1. 空间复杂度O(1)但是修改原数组,因为不是很清楚go的删除节点是不是要O(n)一下,所以非常不建议
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
func validateStackSequences(pushed []int, popped []int) bool {
var i,j int
for {

if i >= len(pushed) {
return i == 0
}
if i == -1 || pushed[i] != popped[j] {
i++
} else {
tmp := pushed[i+1:]
pushed = append(pushed[:i], tmp...)
j++
i--
}
}
return true
}
  1. 使用辅助数组来模拟栈的操作
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    func validateStackSequences(pushed []int, popped []int) bool {
    var help []int
    var i int
    for _,val := range pushed {
    help = append(help, val)
    for {
    if len(help) > 0 && popped[i] == help[len(help) - 1] {
    help = help[:len(help) - 1]
    i++
    } else {
    break
    }
    }
    }
    return len(help) == 0
    }

分行从上到下打印二叉树

题目:https://leetcode.com/problems/binary-tree-level-order-traversal/submissions/1118009830/

题解:用堆就结束了,时间和空间复杂度都是O(n),除了go不提供堆这种数据结构之外没有什么难点

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
func levelOrder(root *TreeNode) [][]int {
var heap []*TreeNode
var res [][]int
if root == nil {
return res
}
heap = append(heap, root)
for {
if len(heap) == 0 {
break
}
tmp := heap
heap = []*TreeNode{}
var resTmp []int
for _,node := range tmp {
resTmp = append(resTmp, node.Val)
if node.Left != nil {
heap = append(heap, node.Left)
}
if node.Right != nil {
heap = append(heap, node.Right)
}
}
res = append(res, resTmp)
}
return res
}

分行之字打印二叉树

题目:https://leetcode.com/problems/binary-tree-zigzag-level-order-traversal/

和上题差不多就是个欢乐堆bfs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
func zigzagLevelOrder(root *TreeNode) [][]int {
var res [][]int
var heap []*TreeNode
if root == nil {
return res
}
heap = append(heap, root)
inOrder := true
for {
if len(heap) == 0 {
break
}
tmp := heap
heap = []*TreeNode{}
var tmpRes []int
for i:= len(tmp) -1; i>=0; i-- {
node := tmp[i]
tmpRes = append(tmpRes, node.Val)
if inOrder {
if node.Left != nil {
heap = append(heap, node.Left)
}
if node.Right != nil {
heap = append(heap, node.Right)
}
} else {
if node.Right != nil {
heap = append(heap, node.Right)
}
if node.Left != nil {
heap = append(heap, node.Left)
}
}
}
inOrder = !inOrder
res = append(res, tmpRes)

}
return res
}

判断二叉搜索树后续遍历合法

(leetcode要钱所以去了牛客)题目:https://www.nowcoder.com/practice/a861533d45854474ac791d90e447bafd?tpId=13&tqId=11176&tPage=1&rp=1&ru=/ta/coding-interviews&qru=/ta/coding-interviews/question-ranking

二叉搜索树:根节点内所有左子树值\<root,所有右子树值> root

解法递归本身没什么问题

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
func VerifySquenceOfBST( sequence []int ) bool {
if len(sequence) == 0 {
return false
}
// write code here
var checkRight func(left int, right int) bool
checkRight = func(left int, right int) bool {
if left >= right {
return true
}
rootVal := sequence[right]
var i = right - 1
for ; i>= left; i-- {
if sequence[i] < rootVal {
break
}
}
for j:=left;j<=i;j++ {
if sequence[j] >= rootVal {
return false
}
}
return checkRight(left, i) && checkRight(i+1,right-1)
}
return checkRight(0,len(sequence) - 1)
}

二叉树中和为某一值的路径

题目:https://leetcode.com/problems/path-sum-ii/

题解:

  1. 简单dfs

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    func pathSum(root *TreeNode, targetSum int) [][]int {
    var res [][]int
    if root == nil {
    return res
    }

    var checkVal func(root *TreeNode, targetSum int, before []int)
    checkVal = func(root *TreeNode, nowSum int, before []int) {
    nowSum += root.Val
    before = append(before, root.Val)
    if root.Left == nil && root.Right == nil {
    if nowSum == targetSum {
    res = append(res, append([]int{}, before...))
    }
    return
    }
    if root.Left != nil {
    checkVal(root.Left, nowSum, before)
    }
    if root.Right != nil {
    checkVal(root.Right, nowSum, before)
    }
    return
    }
    checkVal(root,0,[]int{})
    return res
    }
  2. 利用回溯法优化路径存储,只要有一个path就行不需要传参

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    func pathSum(root *TreeNode, targetSum int) [][]int {
    var res [][]int
    if root == nil {
    return res
    }
    var path []int
    var checkVal func(root *TreeNode, targetSum int)
    checkVal = func(root *TreeNode, nowSum int) {
    nowSum += root.Val
    path = append(path, root.Val)
    if root.Left == nil && root.Right == nil {
    if nowSum == targetSum {
    res = append(res, append([]int{}, path...))
    }
    } else {
    if root.Left != nil {
    checkVal(root.Left, nowSum)
    }
    if root.Right != nil {
    checkVal(root.Right, nowSum)
    }
    }
    path = path[:len(path) - 1]
    return
    }
    checkVal(root,0)
    return res
    }

随机链表的复制

题目:https://leetcode.com/problems/copy-list-with-random-pointer/submissions/1118858301/

解法:

  1. 用hash保存已经创建的链表(面试答不出来就用这个方法),空间复杂度是O(n),时间复杂度O(n)

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    func copyRandomList(head *Node) *Node {
    if head == nil {
    return nil
    }
    nodeMap := map[*Node]*Node{}
    cur := &Node{
    Val: head.Val,
    }
    newNode := &Node {
    Next: cur,
    }
    nodeMap[head] = cur
    nodeMap[nil] = nil
    for {
    if head == nil {
    break
    }

    if next,ok := nodeMap[head.Next]; ok {
    cur.Next = next
    } else {
    next :=&Node{
    Val: head.Next.Val,
    }
    nodeMap[head.Next] = next
    cur.Next = next
    }

    if rand,ok := nodeMap[head.Random]; ok {
    cur.Random = rand
    } else {
    rad :=&Node{
    Val: head.Random.Val,
    }
    nodeMap[head.Random] = rad
    cur.Random = rad
    }

    cur = cur.Next
    head = head.Next
    }
    return newNode.Next
    }
  2. 原地复制链表,从1->2->3 变成 1->1->2->2->3->3*,再去进行random赋值和链表拆分,所以一共有三个循环

    1. 原地复制当前节点,组成复制后链表
    2. 复制后节点的random指向
    3. 拆开两个链表

最后的时间复杂度:O(n),空间复杂度O(1)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
func copyRandomList(head *Node) *Node {
if head == nil {
return nil
}
newNode := &Node{
Next:head,
}
for {
if head == nil {
break
}
next := &Node{
Val:head.Val,
Next:head.Next,
}
head.Next = next
head = next.Next
}

head = newNode.Next
for {
if head == nil {
break
}
if head.Random != nil {
head.Next.Random = head.Random.Next
}
head = head.Next.Next
}
head = newNode.Next
newHead := head.Next
newH := newHead
for {

head.Next = newHead.Next
head = head.Next
if head == nil {
break
}
newHead.Next = head.Next
newHead = newHead.Next
}

return newH
}

双向链表转二叉搜索树

题目:https://www.nowcoder.com/practice/947f6eb80d944a84850b0538bf0ec3a5?tpId=13&tqId=11179&tPage=2&rp=1&ru=%2Fta%2Fcoding-interviews&qru=%2Fta%2Fcoding-interviews%2Fquestion-ranking

题解:

  1. 用数组存中序遍历的结果,然后再重新组装(空间复杂度O(n))(不想写了
  2. 原地处理数组,我写的方法好像比较复杂一点,不过性能一样

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    func Convert( pRootOfTree *TreeNode ) *TreeNode {
    // write code here
    if pRootOfTree == nil {
    return nil
    }
    left := pRootOfTree
    if pRootOfTree.Left != nil {
    left = convLeft(pRootOfTree.Left, pRootOfTree)
    }
    if pRootOfTree.Right != nil {
    convRight(pRootOfTree.Right, pRootOfTree)
    }
    return left
    }

    func convLeft( pRootOfTree *TreeNode , before *TreeNode) *TreeNode {
    left := pRootOfTree
    if pRootOfTree.Left != nil {
    left = convLeft(pRootOfTree.Left,pRootOfTree)
    }
    if pRootOfTree.Right != nil {
    right := convRight(pRootOfTree.Right,pRootOfTree)
    before.Left = right
    right.Right = before
    } else {
    before.Left = pRootOfTree
    pRootOfTree.Right = before
    }
    return left
    }

    func convRight( pRootOfTree *TreeNode, before *TreeNode ) *TreeNode {
    right := pRootOfTree
    if pRootOfTree.Left != nil {
    left := convLeft(pRootOfTree.Left,pRootOfTree)
    before.Right = left
    left.Left = before
    } else {
    before.Right = pRootOfTree
    pRootOfTree.Left = before
    }

    if pRootOfTree.Right != nil {
    right = convRight(pRootOfTree.Right,pRootOfTree)
    }
    return right

    }
  3. 可以先找到最左的开始节点,再中序遍历就行了,符合中序遍历的核心思想

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    func Convert( pRootOfTree *TreeNode ) *TreeNode {
    // write code here
    if pRootOfTree == nil {
    return nil
    }
    var left = pRootOfTree
    for {
    if left.Left == nil {
    break
    } else {
    left = left.Left
    }
    }
    var preNode *TreeNode
    var inOrder func(root *TreeNode)
    inOrder = func(root *TreeNode) {
    if root == nil {
    return
    }
    inOrder(root.Left)
    root.Left = preNode
    if preNode != nil {
    preNode.Right = root
    }
    preNode = root
    inOrder(root.Right)
    }
    inOrder(pRootOfTree)
    return left
    }

序列化二叉树

题目:https://leetcode.com/problems/serialize-and-deserialize-binary-tree/

解法:

  1. bfs用堆,按行遍历,只要不是叶子结点并且有null 子项就输出null,反正主打一个不在乎时间空间优化,hard嘛写出来就算胜利

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    type Codec struct {
    }

    func Constructor() Codec {
    return Codec{}
    }

    // Serializes a tree to a single string.
    func (this *Codec) serialize(root *TreeNode) string {
    var resStr string
    isLeaf := this.isLeaf(root)
    var pops []*TreeNode

    pops = append(pops, root)
    for {
    if len(pops) > 0 {
    tmp := pops
    pops = []*TreeNode{}
    newLeaf := true
    for _,node := range tmp {
    resStr += ","
    if node == nil {
    resStr += "nil"
    } else {
    resStr +=strconv.Itoa(node.Val)
    if !isLeaf {
    pops = append(pops, node.Left)
    pops = append(pops, node.Right)
    newLeaf = newLeaf && this.isLeaf(node.Left) && this.isLeaf(node.Right)
    }
    }
    }
    isLeaf = newLeaf
    } else {
    break
    }
    }
    return resStr[1:]
    }

    func (this *Codec) isLeaf(root *TreeNode) bool {
    return root == nil || (root.Left == nil && root.Right == nil )
    }

    // Deserializes your encoded data to tree.
    func (this *Codec) deserialize(data string) *TreeNode {
    list := strings.Split(data, ",")
    var pops []*TreeNode
    var root *TreeNode
    if len(list) == 1 && list[0] == "nil" {
    return root
    }
    val,_ := strconv.Atoi(list[0])
    root = &TreeNode{Val:val}
    pops = append(pops,root)
    i := 1
    l := len(list)
    for {
    if len(pops) > 0 {
    tmp := pops
    pops = []*TreeNode{}
    for _,node := range tmp {
    if i < l - 1 {
    left := list[i]
    right := list[i+1]
    i += 2
    if left != "nil" {
    leftVal,_ := strconv.Atoi(left)
    ll := &TreeNode{
    Val:leftVal,
    }
    node.Left = ll
    pops = append(pops, ll)
    }
    if right != "nil" {
    rightVal,_ := strconv.Atoi(right)
    rr := &TreeNode{
    Val:rightVal,
    }
    node.Right = rr
    pops = append(pops, rr)
    }
    }
    }
    } else {
    break
    }
    }
    return root
    }
  2. 去参观一下题解,感觉核心思路大差不差放弃治疗就这样吧hard嘛,能写出来就不错了

字符串全排列

题目:https://leetcode.com/problems/permutations/description/

解法:

  1. 回溯没有什么好说的,但是我写的时候偷懒,是用了设置成11的方式来处理已经使用过的数字,这样的问题是时间复杂度会变成:O(n^n)

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    func permute(nums []int) [][]int {
    var res [][]int
    var path []int
    l := len(nums)
    var per func()
    per = func() {
    if len(path) == l {
    res = append(res ,append([]int{}, path...))
    }
    for i,num := range nums {
    if num == 11 {
    continue
    }
    nums[i] = 11
    path = append(path, num)
    per()
    nums[i] = num
    path = path[:len(path) - 1]
    }
    }
    per()
    return res
    }
  2. 如果我每次都把已经处理过的交换到前面,没处理的交换到后面,那时间复杂度就会变成O(n*n!)

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    func permute(nums []int) [][]int {
    var res [][]int
    var path []int
    l := len(nums)
    var per func(start int)
    per = func(start int) {
    if len(path) == l {
    res = append(res ,append([]int{}, path...))
    }
    for i:= start; i<l; i++ {
    path = append(path, nums[i])
    nums[i], nums[start] = nums[start],nums[i]
    per(start+1)
    nums[i], nums[start] = nums[start],nums[i]
    path = path[:len(path) - 1]
    }
    }
    per(0)
    return res
    }
  3. 直接递归能做么,当然可以有什么不可以的,但是每次都要把新的nums数组作为参数传进去,刚学过call的堆栈调用想想就很可怕

全排列2(有重复元素)

题目:https://leetcode.com/problems/permutations-ii/

有重复元素场景的全排列,如果没记错的话当初我应该是搞了个map嗯

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
func permuteUnique(nums []int) [][]int {
var res [][]int
var path []int
var per func(start int)
l := len(nums)
per = func(start int) {
valMap := map[int]bool{}
if len(path) == l {
res = append(res, append([]int{}, path...))
}
for i:= start; i<l; i++ {
num := nums[i]
if _,ok := valMap[num]; ok {
continue
}
valMap[num] = true
path = append(path, num)
nums[i], nums[start] = nums[start], nums[i]
per(start + 1)
nums[i], nums[start] = nums[start], nums[i]
path = path[:len(path) - 1]
}
}
per(0)
return res
}

组合

题目:https://leetcode.com/problems/combinations/description/

顺手就用剪枝+回溯给做了

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
func combine(n int, k int) [][]int {
var res [][]int
var path []int
var list func(start int)
list = func(start int) {
for i:= start; i<=n ; i++ {
path = append(path, i)
if len(path) == k {
res = append(res, append([]int{}, path...))
} else {
list(i+1)
}
path = path[:len(path) - 1]
}
}
list(1)
return res
}

数组中出现次数超过一半的数字

题目:https://leetcode.com/problems/majority-element/submissions/1119582289/

解法:

  1. 排序,不想写也不是什么最优解,时间复杂度O(nlogn),空间复杂度O(1)
  2. 搞个map计算出现次数,时间复杂度O(n),空间复杂度O(n)
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    func majorityElement(nums []int) int {
    n := len(nums)/2
    numsMap := map[int]int{}
    for _,num := range nums {
    if _,ok := numsMap[num]; !ok {
    numsMap[num] = 0
    }
    numsMap[num]++
    if numsMap[num] > n {
    return num
    }
    }
    return 0
    }

3.摩尔计数法,很有意思的东西,简单来说就是两个王国之间打架,一换一,人数多的那里赢。现在因为有一个数字是过半的,所以不一样的数字就一换一抵消,最后剩下的一定是过半的数字,时间复杂度O(n),空间复杂度O(1)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
func majorityElement(nums []int) int {
maj := nums[0]
count := 0
for _,num := range nums {
if maj == num {
count++
} else if count == 0 {
maj = num
count++
} else {
count--
}
}
return maj
}

4.分治,把一堆数字分成两部分,则所求的数字一定是这两部分的众数之一,再合并求这堆的众数,时间复杂度O(nlogn),空间复杂度O(1)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
func majorityElement(nums []int) int {
var maxCnt = func(lo int, up int, val int) (count int){
for i:= lo; i<= up; i++ {
if nums[i] == val {
count++
}
}
return
}
var maj func(lo int, up int) int
maj = func(lo int, up int) int {
if lo == up {
return nums[lo]
}
mid := (lo + up) / 2
left := maj(lo,mid)
right := maj(mid+1,up)
if left == right {
return left
}
return map[bool]int{true:left, false:right}[maxCnt(lo,up, left) > maxCnt(lo,up, right)]
}
return maj(0,len(nums) - 1)
}

最大的k个数

题目:https://leetcode.com/problems/kth-largest-element-in-an-array/submissions/1119616730/

解法:

  1. 最小堆嘛,老朋友了
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
func findKthLargest(nums []int, k int) int {
var heap []int
for _, num := range nums {
if len(heap) < k {
heap = append(heap, num)
construct(&heap, len(heap))
} else {
if num < heap[0] {
continue
} else {
heap[0] = num
update(&heap,1)
}
}
}
return heap[0]

}

func construct(heap *[]int, n int) {
if n == 1 {
return
}
val := (*heap)[n-1]
if n%2 == 1 {
father := (*heap)[(n-1)/2 - 1]
if val < father {
(*heap)[n-1], (*heap)[(n-1)/2 - 1] = (*heap)[(n-1)/2 - 1], (*heap)[n-1]
n = (n-1)/2
construct(heap, n)
}
} else {
father := (*heap)[n/2 - 1]
if val < father {
(*heap)[n-1], (*heap)[n/2 - 1] = (*heap)[n/2 - 1], (*heap)[n-1]
n = n/2
construct(heap, n)
}
}
}

func update(heap *[]int, n int) {
if n*2 > len(*heap) {
return
}
val := (*heap)[n-1]
leftPos := n*2
left := (*heap)[n*2 - 1]
if n*2 < len(*heap) {
right := (*heap)[n*2]
if right < left {
left = right
leftPos = n*2 + 1
}
}
if left < val {
(*heap)[leftPos - 1],(*heap)[n-1] = (*heap)[n-1],(*heap)[leftPos - 1]
update(heap, leftPos)
}
return
}
  1. 分治,其实就是快排,只是说如果某次快排的基准点刚好是k,就不用再排下去了,平均时间复杂度O(n),最坏情况O(n^n),空间复杂度O(1)(因为用的是快排原地交换的逻辑)
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    func findKthLargest(nums []int, k int) int {
    var quickSort func(left,right int)int
    quickSort = func(left,right int) int{
    if left == right {
    return nums[left]
    }
    base := nums[left]
    basePost := left + 1
    for i:= left+1; i<= right; i++ {
    if nums[i] > base {
    nums[i],nums[basePost] = nums[basePost], nums[i]
    basePost++
    }
    }
    nums[left],nums[basePost-1] = nums[basePost-1], nums[left]
    if basePost == k {
    return base
    } else if basePost < k {
    return quickSort(basePost, right)
    } else {
    return quickSort(left, basePost-2)
    }
    }
    return quickSort(0,len(nums) - 1)
    }

数据流的中位数

题目:https://leetcode.com/problems/find-median-from-data-stream/

解法:

  1. 错误解法:二分法,能做但是时间复杂度太高
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
type MedianFinder struct {
val []int
}


func Constructor() MedianFinder {
return MedianFinder{}
}


func (this *MedianFinder) AddNum(num int) {
if len(this.val) == 0 {
this.val = append(this.val, num)
return
}
left := 0
right := len(this.val) - 1
var pos int
for {
if left >= right {
if this.val[left] >=num {
pos = left
} else {
pos = left+1
}
break
}
mid := (left + right )/2
if this.val[mid] <= num {
left = mid + 1
} else {
right = mid
}

}
rightNums := append([]int{},this.val[pos:]...)
this.val = append(this.val[:pos],num)
this.val = append(this.val, rightNums...)
}


func (this *MedianFinder) FindMedian() float64 {
ll := len(this.val)
if ll % 2 == 1 {
return float64(this.val[ll/2])
} else {
return (float64(this.val[ll/2 - 1]) + float64(this.val[ll/2])) / 2
}
}
  1. 用一个最大堆和一个最小堆来维护中位数。很可以理解的解法但是用go得再手写一遍最大堆和最小堆好累啊(以后有心情再说)(前面已经写过一次最小堆了嗯)
  2. 用红黑树,嗯红黑树。。。

最大子数组和

题目:https://leetcode.com/problems/maximum-subarray/submissions/1120161253/、

解法:

  1. 双指针

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    func maxSubArray(nums []int) int {
    var left,right int
    var max,now int
    ll := len(nums)
    now = nums[0]
    max = nums[0]
    right = 1
    for{
    if right < ll {
    if now < 0{
    for ;left<right; left++ {
    now -= nums[left]
    }
    }
    now += nums[right]
    if now > max {
    max = now
    }
    right++
    } else {
    break
    }
    }
    return max
    }
  2. dp,不过我偷懒写了个没有优化空间的方案,其实空间复杂度可以到O(1)
    $$
    sum(i) = max(f(i),sum(i-1)+f(i))
    $$
    $$
    dp(i) = max(dp(i-1), sum(i))
    $$

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    func maxSubArray(nums []int) int {
    ll := len(nums)
    merge := make([]int,ll)
    dp := make([]int,ll)
    merge[0] = nums[0]
    dp[0] = nums[0]
    for i:=1;i<ll;i++ {
    merge[i]=merge[i-1] + nums[i]
    if merge[i]<nums[i] {
    merge[i] = nums[i]
    }

    dp[i] = dp[i-1]

    if merge[i] > dp[i] {
    dp[i] = merge[i]
    }
    }
    return dp[ll-1]
    }
  3. 贪心,其实和双指针很相似但是更直接一点(也有效),只要当前和是小于0的就丢弃

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    func maxSubArray(nums []int) int {
    curSum := nums[0]
    maxVal := nums[0]
    for i:=1;i<len(nums);i++{
    if curSum < 0 {
    curSum = 0
    }
    curSum += nums[i]
    if curSum > maxVal {
    maxVal = curSum
    }
    }
    return maxVal
    }

1到n整数中1出现的次数

题目:https://leetcode.com/problems/number-of-digit-one/

一开始没有思路后来看了一眼题解,计算每一位出现的次数,豁然开朗

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
func countDigitOne(n int) int {
cnt := 0
cur := 1
raw := n
for {
if n == 0 {
break
}
last := n%10
n = n/10
cnt += n * cur
if last > 1 {
cnt += cur
} else if last == 1 {
cnt += (raw%cur) + 1
}
cur *= 10
}
return cnt
}

把数组排成最大的数

题目:https://leetcode.com/problems/largest-number/

题解:

  1. 一开始想的是遍历两次,按照int(a+b)>int(b+a)的顺序排起来
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    func largestNumber(nums []int) string {
    var res []string
    for k,num := range nums {
    cur := strconv.Itoa(num)
    for i,val := range res {
    if compare(cur,val) {
    last := append([]string{}, res[i:]...)
    res = append(res[:i], cur)
    res = append(res, last...)
    break
    }
    }
    if len(res) == k {
    res = append(res,cur)
    }
    }
    if res[0] == "0" {
    return "0"
    }
    return strings.Join(res,"")
    }

    func compare(a,b string) bool {
    left,_ := strconv.Atoi(a+b)
    right,_:= strconv.Atoi(b+a)
    return left > right
    }

然后发现自己只是写了个很拙劣的插入排序,其实核心是compare函数,这题本质是个排序,对所有算法都适用的

  1. 用go提供的slice排序来处理,好像是个快排,待排序数组为s:
    1
    2
    3
    sort.Slice(s,func(i,j int)bool {
    return s[i] < s[j]
    })

在这题的逻辑就是:

1
2
3
4
5
sort.Slice(s,func(i,j int)bool {
left,_ := strconv.Atoi(s[i]+s[j])
right,_:= strconv.Atoi(s[j]+s[i])
return left > right
})

简化一点其实可以直接用字典排序:

1
2
3
4
5
6
7
sort.Slice(s,func(i,j int)bool {
if s[i][0] == s[j][0] {
return s[i] + s[j] > s[j] + s[i]
} else {
return s[i] > s[j]
}
})

最后:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
func largestNumber(nums []int) string {
var res []string
for _,num := range nums {
res = append(res, strconv.Itoa(num))
}
sort.Slice(res, func(i,j int) bool {
if res[i][0] == res[j][0] {
return res[i] + res[j] > res[j] + res[i]
} else {
return res[i] > res[j]
}
})
if res[0] == "0" {
return "0"
}
return strings.Join(res,"")
}

把数字翻译成字符串个数

题目:https://leetcode.com/problems/decode-ways/submissions/1122667338/?utm_source=LCUS&utm_medium=ip_redirect&utm_campaign=transfer2china

怎么看都是一道dp题,不过关于0和>27的场景有很多判断条件,一定要理清楚再写

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
func numDecodings(s string) int {
if s[0] == '0' {
return 0
}
start := 0
before := 1
now := 1
ll := len(s)

for i:= 1; i<ll; i++ {
tmp := start
start = before
before = now
if s[i] == '0' {
if s[i-1] == '0' || s[i-1:i+1] > "27"{
return 0
} else {
now = start
}
continue
} else if s[i-1] == '0' {
now = tmp

} else if s[i-1:i+1] < "27" {
now += start
} else {
now = before
}
}
return now
}

最短路径和

题目:https://leetcode.com/problems/minimum-path-sum/submissions/

熟悉的dp again,应该说是最经典的dp之一
我这里偷了判断的懒,加了个初始化,其实用 if i == 0 j == 0也没啥问题

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
func minPathSum(grid [][]int) int {
xlen := len(grid)
ylen := len(grid[0])
beforeRow := make([]int, ylen+1)
for j,row := range grid[0] {
beforeRow[j+1] = beforeRow[j]+row
}
beforeRow[0] = beforeRow[ylen]

for i:=1; i<xlen; i++ {
for j,val := range grid[i] {
beforeRow[j+1] = min(beforeRow[j], beforeRow[j+1]) + val
}
beforeRow[0] = beforeRow[ylen]
}
return beforeRow[ylen]
}

无重复字符最短字串

题目:https://leetcode.com/problems/longest-substring-without-repeating-characters/

滑动窗口嘛,常用办法是双指针,右指针代表当前字母,左指针代表子字符串开头,两个循环

这里用空间换了点时间,用map存当前字符串的最近一个位置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
func lengthOfLongestSubstring(s string) int {
posMap := make(map[rune]int, 26)
var max,preLen int
for i,d := range s {
if pos,ok := posMap[d]; ok && i-preLen <= pos{
preLen = i - pos
} else {
preLen++
if max < preLen {
max = preLen
}
}
posMap[d] = i
}
return max
}

丑数

题目:https://leetcode.com/problems/ugly-number-ii/

方案

  1. 不想写,但是可以堆排+hash去重
  2. 当 2≤i≤n2 \le i \le n2≤i≤n 时,令dp[i]=min⁡(dp[p2]×2,dp[p3]×3,dp[p5]×5)\textit{dp}[i]=\min(\textit{dp}[p_2] \times 2, \textit{dp}[p_3] \times 3, \textit{dp}[p_5] \times 5)dp[i]=min(dp[p2]×2,dp[p3]×3,dp[p5]×5),然后分别比较 dp[i]\textit{dp}[i]dp[i] 和 dp[p2]×2,dp[p3]×3,dp[p5]×5\textit{dp}[p_2] \times 2,\textit{dp}[p_3] \times 3,\textit{dp}[p_5] \times 5dp[p2]×2,dp[p3]×3,dp[p5]×5 是否相等,如果相等则将对应的指针加 111。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    func nthUglyNumber(n int) int {
    var p2,p3,p5 int
    uglyList := make([]int, n)
    uglyList[0] = 1
    for i:=1; i<n; i++ {
    var p *int
    var val int
    for {
    v2 := uglyList[p2] * 2
    v3 := uglyList[p3] * 3
    v5 := uglyList[p5] * 5
    if v2 < v3 {
    p = &p2
    val = v2
    } else {
    p = &p3
    val = v3
    }
    if val > v5 {
    p = &p5
    val = v5
    }
    *p += 1
    if val != uglyList[i-1] {
    break
    }
    }
    uglyList[i] = val
    }
    return uglyList[n-1]
    }

字符串中第一个只出现一次的字符

题目:https://leetcode.com/problems/first-unique-character-in-a-string/description/

解法:

  1. 可以用hash保存次数,再遍历一次返回次数大于一的
  2. 用hash改成用26字母数组,保存是否为单数/是单数的位置,空间复杂度O(1),时间复杂度O(n)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
func firstUniqChar(s string) int {
count := make([]int, 26)
ll := len(s)+1
for i,val := range s {
if count[val-'a'] > 0 {
count[val-'a'] = ll
} else {
count[val-'a'] = i+1
}

}
min := ll
for _,val := range count {
if val != ll && val != 0 {
if min > val {
min = val
}
}
}
if min == ll {
return -1
} else {
return min-1
}
}

字符流中第一个不重复的字符

题目:https://www.nowcoder.com/practice/00de97733b8e4f97a3fb5c680ee10720?tpId=13&&tqId=11207&rp=1&ru=/activity/oj&qru=/ta/coding-interviews/question-ranking

解法:

  1. 干了一件很神奇的事情,搞了两个数组,一模一样大,第一个存第一次出现的byte,第二个存第二次出现的byte,虽然性能肯定不是最好的但是还挺好玩

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    var pops []byte
    var pops2 []byte

    func Insert(ch byte) {
    ll := len(pops)
    var flag bool
    for i:=0; i<ll; i++ {
    val := pops[i]
    if val == ch{
    flag = true
    pops2[i] = ch
    break
    }
    }
    if !flag {
    pops = append(pops, ch)
    pops2 = append(pops2, byte(0))
    }
    }

    func FirstAppearingOnce() byte {
    ll := len(pops)
    for i:=0; i<ll; i++ {
    if pops[i] != pops2[i] {
    return pops[i]
    }
    }
    return '#'
    }
  2. 第一个优化是用map保存地址,在插入的时候就不需要遍历,节约时间。第二个是获取的时候遍历数组而不是map

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    var pops []byte
    var posMap = map[byte]int{}

    func Insert(ch byte) {
    if _,ok := posMap[ch]; !ok {
    posMap[ch] = len(pops)
    pops = append(pops, ch)
    } else {
    posMap[ch] = -1
    }
    }

    func FirstAppearingOnce() byte {
    for _,b := range pops {
    if posMap[b] != -1 {
    return b
    }
    }
    return '#'
    }

逆序对

题目:https://leetcode.com/problems/reverse-pairs/
理论上也是老朋友了但是不影响写起来好陌生
用的是分治的思路,先算出每个子数组里面的逆序对数,同时排序,因为归并的求逆序对数和排序需要两次O(n),最后的时间复杂度就是O(nlogn)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
func reversePairs(nums []int) int {
var num int
var mergeSort func(nums []int) []int
mergeSort = func(nums []int) []int {
ll := len(nums)
if ll == 1 {
return nums
}
mid := ll/2
rightl := ll - mid
left := mergeSort(nums[:mid])
right := mergeSort(nums[mid:])
res := make([]int, ll)
var righti,lefti int
for {
if lefti >= mid || righti >= rightl {
break
}
if left[lefti] > right[righti] * 2 {
num += mid - lefti
righti++
} else {
lefti++
}
}
lefti = 0
righti = 0
i := 0
for {
if lefti >= mid || righti >= rightl {
break
}
if left[lefti] > right[righti] {
res[i] = right[righti]
righti++
} else {
res[i] = left[lefti]
lefti++
}
i++
}
if lefti < mid {
for ; lefti <mid; lefti++ {
res[i] = left[lefti]
i++
}
} else if righti < rightl {
for ; righti <rightl; righti++ {
res[i] = right[righti]
i++
}
}
return res
}
mergeSort(nums)
return num

}

相交链表

题目:https://leetcode.com/problems/intersection-of-two-linked-lists/
解法1: 用哈希记录已经访问节点

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
func getIntersectionNode(headA, headB *ListNode) *ListNode {
seenHash := map[*ListNode]bool{}
aHead := headA
for {
if aHead != nil {
seenHash[aHead] = true
aHead = aHead.Next
}else {
break
}
}
for {
if headB == nil {
return nil
}
if _,ok := seenHash[headB]; ok {
return headB
} else {
headB = headB.Next
}
}
return nil
}

解法2: 核心思路是消除长度差,不过我的脑回路比较简单,就是先算出两个链表的长度差,再抹平,再遍历

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
/**
* Definition for singly-linked list.
* type ListNode struct {
* Val int
* Next *ListNode
* }
*/
func getIntersectionNode(headA, headB *ListNode) *ListNode {
aHead := headA
bHead := headB
var aTimes,bTimes int
for {
aTimes++
if aHead.Next == nil {
break
}
aHead = aHead.Next

}
for {
bTimes++
if bHead.Next == nil {
break
}
bHead = bHead.Next

}
if aHead != bHead {
return nil
}
var left int
if aTimes > bTimes {
t := aTimes - bTimes
left = bTimes
for i:=0;i<t;i++ {
headA = headA.Next
}
} else {
t := bTimes - aTimes
left = bTimes
for i:=0;i<t;i++ {
headB = headB.Next
}
}

for i:=0; i<left; i++{
if headA == headB {
return headA
}
headA = headA.Next
headB = headB.Next
}
return nil
}

解法3:本质还是抹平长度差,但是解法讨巧很多,当A结尾后指向B,当B结尾后指向A,相当于两个链表各循环了一次,天然抹平了长度差

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
func getIntersectionNode(headA, headB *ListNode) *ListNode {
aHead := headA
bHead := headB
for {
if aHead == bHead {
return aHead
}

if aHead == nil {
aHead = headB
} else {
aHead = aHead.Next
}
if bHead == nil {
bHead = headA
} else {
bHead = bHead.Next
}

}
return nil
}

排序数组中数字的开始和结束节点

题目:https://leetcode.com/problems/find-first-and-last-position-of-element-in-sorted-array/

核心肯定是二分法

解法1:二分法,分别查找上界和下界,两次查找O(logn),可以关注一下找上界和下界的边界处理方式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
func searchRange(nums []int, target int) []int {
res := make([]int,2)
ll := len(nums) - 1
start := 0
end := ll
for {
if start > end {
break
}
mid := start + (end - start)/2
if nums[mid] >= target {
end = mid - 1
} else {
start = mid + 1
}
}

if start > ll || nums[start] != target {
return []int{-1,-1}
}
res[0] = start
end = ll
for {
if start > end {
break
}
mid := start + (end - start)/2
if nums[mid] <= target {
start = mid + 1
} else {
end = mid - 1
}
}
res[1] = end
return res
}

解法2: 只用一个上界/下界的函数,比如找上界,那就是分别找target和target - 1的上界,主要的好处是只用了一个函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
func searchRange(nums []int, target int) []int {
ll := len(nums) - 1
low := upper(nums, target - 1)
if low >= ll || nums[low+1] != target {
return []int{-1,-1}
}
high := upper(nums, target)
return []int{low+1, high}
}

func upper(nums []int, target int) int{
start := 0
end := len(nums) - 1
for {
if start > end {
break
}
mid := start + (end - start) / 2
if nums[mid] <= target {
start = mid + 1
} else {
end = mid - 1
}
}
return end
}

求缺失数字

题目:https://leetcode.com/problems/missing-number/

求从0开始的一个数组中缺少的那个数字,用异或做就行

1
2
3
4
5
6
7
8
9
func missingNumber(nums []int) int {
ll := len(nums)
res := ll
for i:=0; i<ll; i++ {
res ^= i
res ^= nums[i]
}
return res
}

二叉搜索树第k大的值

题目:https://leetcode.com/problems/kth-smallest-element-in-a-bst/

很显然也很典型的中序遍历:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
func kthSmallest(root *TreeNode, k int) int {
var res int
var midTraversal func(root *TreeNode)
midTraversal = func(root *TreeNode){
if root == nil {
return
}
midTraversal(root.Left)
k -= 1
if k == 0 {
res = root.Val
return
}
midTraversal(root.Right)
}
midTraversal(root)
return res
}

看到了一种很骚的利用go管道的做法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
func kthSmallest(root *TreeNode, k int) int {
c := make(chan int)
go midTraversal(root,c)
for i:=0;i<k-1;i++ {
<-c
}
return <-c
}
func midTraversal (root *TreeNode, c chan int){
if root == nil {
return
}
midTraversal(root.Left,c)
c <- root.Val
midTraversal(root.Right,c)
}

也就是每次遍历写了值就会直接被取出,直到写入第k个的时候函数返回。

会不会出现内存泄漏?

我理解应该是会的,因为通道的存活时长和程序是一样的,也就是只有在main函数执行结束程序退出才会销毁,所以oj还比较简单,毕竟只有这么点自测用力,如果是个后台服务就原地表演一个g了

二叉树深度

题目:https://leetcode.com/problems/maximum-depth-of-binary-tree/solutions/2817503/golang-solution/

一个dfs或者bfs,好像也翻不出什么花来,写个dfs的

1
2
3
4
5
6
7
8
9
10
11
12
func maxDepth(root *TreeNode) int {
if root == nil {
return 0
}
left := maxDepth(root.Left)
right := maxDepth(root.Right)
if left > right {
return left + 1
}else {
return right + 1
}
}

bfs本来是不想写的,但是看到了一个list包,是一个go的链表的包,也可以用来当成队列处理,这样以后写队列会顺手很多,不用把数组复制来复制去的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
func maxDepth(root *TreeNode) int {
if root == nil {
return 0
}
queue := list.New()
var level int
queue.PushBack(root)
for queue.Len() > 0{
ll := queue.Len()
for i:=0; i<ll; i++ {
e := queue.Front()
queue.Remove(e)
node := e.Value.(*TreeNode)
if node.Left != nil {
queue.PushBack(node.Left)
}
if node.Right != nil {
queue.PushBack(node.Right)
}
}
level++
}
return level
}

判断平衡树

题目:https://leetcode.com/problems/balanced-binary-tree/

题解:好像也没啥好说的就是个dfs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
func isBalanced(root *TreeNode) bool {
isB := true
var checkHeight func(root *TreeNode) int
checkHeight = func(root *TreeNode) int {
if root == nil {
return 0
}
ll := checkHeight(root.Left)
rr := checkHeight(root.Right)
if ll - rr > 1 || ll - rr < -1 {
isB = false
}
if ll < rr {
return rr + 1
} else {
return ll + 1
}
}
checkHeight(root)
return isB
}

数组中只出现一次的两个数字

题目:https://leetcode.com/problems/single-number-iii/

题解:

这题题解还挺有意思的,这题和136唯一不同就是此题有两个出现了一次的数, 所以我们按照136题类似的遍历一遍数组然后得到所有数字异或结果是待求两个数(设为res1和res2)的异或结果res1^res2, 所以我们不能直接这样做. 既然同时存在两个只出现过一次的数, 那么如果我们将数组分成两部分且这两部分数组分别包含了res1和res2, 然后再采用136完全一样的思路不就解决了吗。

所以问题转换成如何将数组分成两个部分且这两部分分别包含res1和res2. 还是利用位操作, 我们可以根据res1和res2不同的某一位来划分数组, 例如若二者第3位分别为0和1, 那么我们就根据”第三位为0还是为1”来划分数组. 我们不妨取二者不同位的最低(即最右)的那位, 即二者异或结果res1^res2最低位的1. 设res=res1^res2, 那么res&(-res)即res最低位的1。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
func singleNumber(nums []int) []int {
var res int
for _,val := range nums {
res ^= val
}
bitNum := res & (-res)
var res1,res2 int
for _,val := range nums {
if val & bitNum > 0 {
res1 ^= val
} else {
res2 ^= val
}
}
return []int{res1,res2}
}

数组中唯一只出现一次的数字

题目:https://leetcode.com/problems/single-number-ii/

这题最简单粗暴的方式就是搞一个map,存每个数字出现的次数,出现次数为1的就是返回值

1
2
3
4
5
6
7
8
9
10
11
12
13
14
func singleNumber(nums []int) int {
mapNum := map[int]int{}
for _,num := range nums {
mapNum[num]++
}
for k,v := range mapNum{
if v > 1 {
continue
} else {
return k
}
}
return 0
}

但是这样要占用的空间确实也很大,所以按照前面的不重复数字解法来看,这题用到的应该也是位运算。一个数一共32位,计算每位1出现的次数,最后总数mod3,余下的就是唯一的单数个数字在这位的位数了。思路很简单~但是写的时候要看一下得用int32

1
2
3
4
5
6
7
8
9
10
11
12
func singleNumber(nums []int) int {
var res int32
for i:=0;i<32;i++ {
var val int
for _,num := range nums {
val += (num>>i) & 1
}
cnt := val%3
res |= int32(cnt<<i)
}
return int(res)
}

还有一种数电思路,但是说实话数电那个时候都拿来学法语课了。。。

和为s的两个数字

题目:https://leetcode.com/problems/two-sum-ii-input-array-is-sorted/solutions/3267180/c-go-two-approaches/

解法:标准的双指针

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
func twoSum(numbers []int, target int) []int {
rr := len(numbers) - 1
ll := 0
for ll < rr {
sum := numbers[ll] + numbers[rr]
if sum == target {
return []int{ll+1,rr+1}
} else if sum > target {
rr--
} else {
ll++
}
}
return []int{-1,-1}
}

和为s的连续正整数序列

题目:https://leetcode.com/problems/consecutive-numbers-sum/solutions/4416081/golang-solution-clean-code-easy-to-understand/

这题本质是个数学题,写个等差数列求和公式放在这里:
$$
\sum^y_{i=x,n=k} x= k(x+y)/2
$$

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
func consecutiveNumbersSum(n int) int {
cnt := 1
i := 2
for isConsecutive(i,n){
mid := n/i
if i%2 == 1 && i*mid == n {
fmt.Printf("i:%d,mid:%d",i,mid)
cnt++
} else if i%2 == 0 && i*(mid*2+1) == 2 * n {
fmt.Printf("i:%d,mid:%d",i,mid)
cnt++
}
i++
}
return cnt
}

func isConsecutive(i,k int) bool {
if i * (i+1) <= 2*k {
return true
} else {
return false
}
}

翻转单词顺序

题目:https://leetcode.com/problems/reverse-words-in-a-string/

一开始用了暴力求解,遍历扫一遍字符串,把每个单词放进栈里,最后再出栈。这个思路是没有问题的,但是首先要多余的空间复杂度,其次要多次遍历

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
func reverseWords(s string) string {
var stack [][]byte
var res string
i:=0
ll:=len(s)
var tmpStr []byte
for i<ll{
if s[i] == ' ' {
if len(tmpStr) > 0{
stack = append(stack, tmpStr)
tmpStr = []byte{}
}
} else {
tmpStr = append(tmpStr,s[i])
}
i++
}
if len(tmpStr) > 0{
stack = append(stack, tmpStr)
}
for j:= len(stack) - 1; j>=0;j-- {
res += string(stack[j])
res += " "
}
return strings.TrimRight(res, " ")
}

后面改成了双指针,从后往前

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
func reverseWords(s string) string {
rr := len(s) - 1
ll := rr
var res []byte
for rr >=0 {
for rr >= 0 && s[rr] == ' ' {
rr--
ll--
}

for ll>=0 && s[ll] != ' ' {
ll--
}
res = append(res, s[ll+1:rr+1]...)
res = append(res, ' ')
rr = ll
ll = rr
}
return strings.TrimRight(string(res), " ")
}

刷题碎碎念一:环形链表快慢指针之为什么

Posted on 2023-12-08 | In leetcode

背景

用快慢指针找环形链表是个非常非常非常常见的题型,之前在leetcode find duplicate number 见到了环形链表的解法,就顺手算了一下,算完给自己留个纪念(毕竟这可能是第三次算这东西了。。。)(每次都像是什么崭新的知识点)

快慢指针找环形链表解法

过分经典在这里放个链接顺便随便说说吧:https://leetcode.cn/problems/c32eOV/solutions/1037744/lian-biao-zhong-huan-de-ru-kou-jie-dian-vvofe/

简单来说就是有一个快指针fast和一个慢指针slow,快指针每次往前走两步,慢指针每次往前走一步,如果两个指针最终相遇,那就说明这个链表是个环形链表

那怎么找入环口呢?

在两个节点相遇之后,掏出一个新的指针 ptr 指向链表头部,然后和 slow 每次往前走一步,这两个指针就会在入环点相遇

原理

感觉真的算了很多次了但是每次都记不住QAQ

来随便写个个链表:

HEAD–B–…–C–Enter–D–…–Enter

假设:

  • 从head到Enter(不包含)一共a步
  • 环内一共 b 节点(包含enter)
  • 环内相遇时包含Enter和相遇节点一共距离enter x 节点
  • $\alpha$ 和 $\beta$ 是模数,任意

那么当fast和slow相遇时,可以写个公式:

$a + x + \alpha b = 2(a+x)$

所以也就可以得知: 只要有一个 $x >= 1 \&\& x <= b$ 时满足:当 $a + x = \alpha b$ 就可以

那就很简单了,$x=b - (a \bmod b)$

再来看一下找入环口这件事情的公式,一共走了a+1步:

$a + 1 + x = \beta b + 1$

看 $a + x = \alpha b$,这个事情就是很自然而然的了对不对

好了欢乐结束了希望不要再忘了

redis学习笔记五:其他独立功能

Posted on 2023-04-26 | In redis

事务

开启事务:MULTI命令切换至事务状态

非事务状态:来一个get/set命令,立刻执行

事务状态:

  • 如果客户端发送的命令为 EXEC 、 DISCARD 、 WATCH 、 MULTI 四个命令的其中一个, 那么服务器立即执行这个命令
  • 如果不是这四个命令,扔进事务队列

事务队列

每个 Redis 客户端都有自己的事务状态, 这个事务状态保存在客户端状态的 mstate 属性里面:

1
2
3
4
5
6
7
8
9
10
typedef struct redisClient {

// ...

// 事务状态
multiState mstate; /* MULTI/EXEC state */

// ...

} redisClient;

事务状态包含一个事务队列, 以及一个已入队命令的计数器 (也可以说是事务队列的长度):

1
2
3
4
5
6
7
8
9
typedef struct multiState {

// 事务队列,FIFO 顺序
multiCmd *commands;

// 已入队命令计数
int count;

} multiState;

执行EXEC

当一个处于事务状态的客户端向服务器发送 EXEC 命令时, 这个 EXEC 命令将立即被服务器执行: 服务器会遍历这个客户端的事务队列, 执行队列中保存的所有命令, 最后将执行命令所得的结果全部返回给客户端。

执行失败:

当事务中的某个命令执行失败的时候,redis不会返回报错也不会回滚,而是会执行后面的命令

监听Watch

cas和aba问题

背景:cas和aba问题

cas:compare and swap,为了解决并发场景下同时访问同一个共享变量时导致的冲突问题,比如a在改变量env的时候b正好在改它

当然了也可以用锁,如果几个线程都是读线程,挨个获取锁就消耗了大量的时间和block的线程资源,所以锁也被称为悲观锁,就是假设了一种悲观场景,每次访问共享变量都会遇到冲突

cas就是很常用的乐观锁机制,它其实在代码逻辑上并没有上锁并且乐观的认为大部分人来访问它就是为了读而已,遇到冲突是很少的场景

CAS 的思想很简单:三个参数,一个当前内存值 V、旧的预期值 A、即将更新的值 B,当且仅当预期值 A 和内存值 V 相同时,将内存值修改为 B 并返回 true,否则什么都不做,并返回 false

在这里抄一点来自java的cas使用代码:

1
2
3
4
5
6
7
public final int getAndAddInt(Object o, long offset, int delta) {
int v;
do {
v = this.getIntVolatile(o, offset);
} while(!this.compareAndSwapInt(o, offset, v, v + delta));
return v;
}

可以看到,利用cas来实现原子操作而不是真的搞个自旋锁,去比较获取的值和预期是否一致,一致就认为没有其他线程在改动变量,当然了cas底层的compare和swap操作本身还是可以用自旋锁去实现的

当然了就想在多读少写的情况下用自旋锁太耗时一样,如果是多写少读的场景就会有很多线程陷入无限循环,会给cpu带来大量的开销

aba问题:当另一个线程先把变量env=a改成env=b,又改成了env=a,而改动因为执行速度很快恰好没有被捕捉到,那还是会出现多个线程同时改一个变量的场景

当然为了解决aba问题不同的系统用了不同的改进方法,比如java就加了版本号的概念,每次修改也会把版本号+1

watch

再回到redis,redis事务中watch的执行就是用了类似cas的乐观锁机制,不过它不是在执行的时候去compare,而是,肤浅的理解就是从watch指令开始就盯住对应的变量,一旦变量发生改动就通知客户端,取消事务,也就避免了aba问题

注意:在redis中只要exec开始执行就不会再有回滚之类的任何操作,所以watch的校验都是在exec之前的,exec执行后就会取消对所有键的监控,所以watch的意义是阻止执行

给个例子:

1
2
3
4
5
6
WATCH name
val = GET mykey
val = val + 1
MULTI
SET mykey $val
EXEC

lua

不想看用到再说

redis学习笔记四:多机数据库

Posted on 2023-04-13 | In redis

复制

同步

  1. 从服务器发送sync请求到主服务器
  2. 主服务器生成RDB,并把之后的写命令都扔进一个缓冲区
  3. 主服务器把RDB发给从服务器,从服务器初始化
  4. 主服务器把缓冲区命令发给从服务器,从服务器执行,使得两边数据一致

命令传播

同步是在从服务器创建的时候从主服务器同步数据,命令传播是在主服务器的数据变动的时候把写命令同步给从服务器的行为

问题

断线导致丢失数据的情况要怎么办?

– 同步:重同步

完整重同步和同步是一样,就是用在初始化同步数据

部分重同步则是用来处理断线后重复制的情况

部分重同步用于处理断线后重复制的情况:当从服务器在断线后重新连接到主服务器时,如果条件允许,主服务器可以将断线后这一段时间执行的血命令发送给从服务器,从服务器只接收并执行这些写命令,就可以将从服务器数据库的状态更新至主服务器当前的状态。

  1. 偏移量:主服务器和从服务器分别维护一个复制偏移量,主服务器发N个消息就给自己的偏移量+N,从服务器收到N个就把自己的偏移量+N,这样一对就知道少了啥了
  2. 复制积压缓冲区:一块大小固定的缓冲区,主服务器会把自己收到的写命令和对应的offset都存在里面,对了一下偏移量,如果对应偏移量的数据还在缓冲区就直接把后面的指令给从服务器,不在缓冲区的话就执行完整重同步
  3. 服务器运行ID:从服务器在初始化复制的时候会把对应的主服务器id存下来,从服务器断线重启之后向主服务器发请求,如果id和主服务器id是同一个,就执行部分重同步,不一样(不大理解为啥会不一样)就完整重同步

– 命令传播:心跳检测

在命令传播阶段,从服务器会每秒给主服务器发一个当前offset

  1. 监测连接状态,一秒以后没收到连接可能就有问题了
  2. 如果连接正常的从服务器少于一个数量或者延迟很大的时候,主服务器可以拒接写入
  3. offset也可以用来监测是不是丢命令了

优缺点

优点很明显,就是主从实现读写分离嘛,减少了主服务器的压力

缺点也很明显,主服务器挂了就没了,没有任何的容错能力,而且也没办法支持扩容之类的操作

Sentinel

唉又是那个毛病,开头能不能先介绍一下sentinel是啥

首先假定(好像不是假定)redis的集群模式是master-slave(好像确实就是),如果master挂了,需要再选一个master节点出来,并且把之前master的slave都转移到新的master头上,这个系统就叫做哨兵系统(sentinel)

当然了同时我们就可以利用sentinel系统监控各个节点的特征,获取节点信息配置监控告警等

受不了了怎么还看到了raft,感觉自己在看hdfs和zk,sentinel集群和zk集群感觉其实真的差不多

TBD:

有空自己开一下sentinel模式试试看

简单来说就是sentinel系统本身就是一个节点,但是这个节点和普通redis节点不一样的是它开启了sentinel模式,同时command表也就不支持set之类的操作,而是pong这样的sentinel模式需要的命令

启动连接

sentinel启动的最后一步是向被监视的主服务器建立网络连接,sentinel会建立两种异步网络连接:

  • 一个是命令连接, 这个连接专门用于向主服务器发送命令, 并接收命令回复。
  • 另一个是订阅连接, 这个连接专门用于订阅主服务器的 __sentinel__:hello 频道。

关于订阅连接究竟是为什么要出现,各种地方的解释不一样,有的文档(包括这本Redis设计与实现)说订阅连接是为了防止丢失信息,因为订阅连接是个长连接,所以即使命令连接的返回数据丢了,也可以从订阅连接里收到。另一个说法是订阅连接可以帮助sentinel发现其他sentinel,从而建立Sentinel集群,姑且我们认为两种都有吧

获取slave节点

Sentinel 以每十秒一次的频率向被监视的主服务器发送 INFO 命令,INFO命令会返回改主服务器下从服务器的信息。获取到从服务器的信息后,sentinel就可以向从服务器建立命令连接和订阅连接

建立Sentinel集群

默认情况下,Sentinel每2s一次,向所有被监视的主服务器和从服务器所订阅的sentinel:hello频道上发送消息,消息中会携带Sentinel自身的信息和主服务器的信息。这个信息先叫他频道消息。

因为每个sentinel都会向每个监视的服务器发频道消息,所以对于监视同一个服务器的多个sentinel来说,sentinel可以通过频道消息感知到其他sentinel的存在。

当Sentinel通过频道信息发现一个新的Sentinel时,它不仅会为新Sentinel在sentinels字典中创建相应的实例结构,还会创建一个连向新Sentinel的命令连接(但是不创建订阅连接)

感想就是如果还想研究细节可以去再学一遍zk

发现下线

主观下线:

默认情况下,Sentinel每秒一次向所有与它建立了命令连接的实例(包括主服务器、从服务器和其他Sentinel)发送PING命令,并根据回复判断实例是否在线。如果在Sentinel配置文件中的down-after-milliseconds毫秒内,连接向Sentinel返回无效回复,那么Sentinel就会认为该实例主观下线(SDown)。

sentinel认为这个服务器主观下线之后,就会去检查它是不是真的下线了,也就是客观下线:

为了确认是否真的下线,这个Sentinel会向同时监控这个主服务器的所有其他Sentinel发送查询命令,判断它们是否也任务主服务器下线(包括主观下线和客观下线)。如果达到Sentinel配置中的quorum数量的Sentinel实例都判断主服务器为主观下线,则该主服务器就会被判定为客观下线。

leader sentinel选举

看到了吗熟悉的raft

说起来在这里放个raft的论文网址:https://web.stanford.edu/~ouster/cgi-bin/papers/raft-atc14 以后方便找

稍微简单解释一下,就是判定服务器为客观下线的sentinel节点可以发起选举,选举拿到半数以上赞成票并且超过一个设定阈值(quorum 值这个东西看上去像redis原创的?)就能当leader

至于具体的投票限制平票方案等等还是去看raft吧,反正就是选了个leader sentinel出来

故障转移

  1. 选新master。当然是有一些条件的
    1. 过滤掉下线节点,过滤掉最近5s没有回复sentinel info的节点
    2. 选conf配置里设定优先级高的节点
    3. 优先级一样的话选复制偏移量最大的
  2. 将所有从服务器改为复制新的主服务器。
  3. 将已下线的主服务器设置为新的主服务器的从服务器。

优缺点

优点:

  1. master-slave的运行本身基于主从复制模式,所以该有的还是有
  2. master挂掉就可以自动切换

缺点:

  1. 扩容还是不支持,容量还是依赖master节点机器配置
  2. 不知道算不算缺点的缺点,运行sentinel集群需要消耗额外的资源

集群

也来简单介绍一下,集群模式和sentinel模式是两种不一样的模式,集群模式的逻辑就是去中心化,每个节点都和集群内其他节点有连接,集群内部也可以设置主从,比如六个节点的话可以设置三主三从,主挂了从将自动变成主节点,但是和前面的读写分离不大一样,这里从节点主要是拿来做热备的,因为分布式存储本身就解决了吞吐量的问题(个人看书的理解

同时集群和前面两种模式不一样的是,每个主节点保存的东西都是不一样的,也就是终于转变为了分布式存储(可喜可贺)

集群本身的启动,集群的搭建和主从设置都是要在启动的时候通过配置实现的。

连接各个节点的工作可以使用 CLUSTER MEET 命令来完成, 该命令的格式如下:

1
CLUSTER MEET <ip> <port>

我猜哈我猜,在配置里配好集群节点之后,每个节点启动的时候会执行cluster meet,将对应的节点加到自己所属的集群中

启动节点

一个节点就是一个运行在集群模式下的 Redis 服务器, Redis 服务器在启动时会根据 cluster-enabled 配置选项的是否为 yes 来决定是否开启服务器的集群模式。

集群模式下其他都是都是正常运行,同时serverCon会调用clusterCron 函数来执行在集群模式下需要执行的常规操作

集群相关的信息会被保存在 cluster.h/clusterNode 结构, cluster.h/clusterLink 结构, 以及 cluster.h/clusterState 结构里面

clusterNode

每个节点都会使用一个 clusterNode 结构来记录自己的状态, 并为集群中的所有其他节点(包括主节点和从节点)都创建一个相应的clusterNode 结构, 以此来记录其他节点的状态

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
struct clusterNode {

// 创建节点的时间
mstime_t ctime;

// 节点的名字,由 40 个十六进制字符组成
// 例如 68eef66df23420a5862208ef5b1a7005b806f2ff
char name[REDIS_CLUSTER_NAMELEN];

// 节点标识
// 使用各种不同的标识值记录节点的角色(比如主节点或者从节点),
// 以及节点目前所处的状态(比如在线或者下线)。
int flags;

// 节点当前的配置纪元,用于实现故障转移
uint64_t configEpoch;

// 节点的 IP 地址
char ip[REDIS_IP_STR_LEN];

// 节点的端口号
int port;

// 保存连接节点所需的有关信息
clusterLink *link;

// ...

};

clusterLink

clusterLink保存的是连接到节点所需要的连接信息,还挺明显的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
typedef struct clusterLink {

// 连接的创建时间
mstime_t ctime;

// TCP 套接字描述符
int fd;

// 输出缓冲区,保存着等待发送给其他节点的消息(message)。
sds sndbuf;

// 输入缓冲区,保存着从其他节点接收到的消息。
sds rcvbuf;

// 与这个连接相关联的节点,如果没有的话就为 NULL
struct clusterNode *node;

} clusterLink;

clusterState

最后, 每个节点都保存着一个 clusterState 结构, 这个结构记录了在当前节点的视角下, 集群目前所处的状态 —— 比如集群是在线还是下线, 集群包含多少个节点, 集群当前的配置纪元

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
typedef struct clusterState {

// 指向当前节点的指针
clusterNode *myself;

// 集群当前的配置纪元,用于实现故障转移
uint64_t currentEpoch;

// 集群当前的状态:是在线还是下线
int state;

// 集群中至少处理着一个槽的节点的数量
int size;

// 集群节点名单(包括 myself 节点)
// 字典的键为节点的名字,字典的值为节点对应的 clusterNode 结构
dict *nodes;

// ...

} clusterState;

数据分片

redis集群引入了哈希槽(hash slot)的概念,Redis 集群有16384 个哈希槽,每个节点负责一部分hash槽,每个key通过对16384取mod来决定自己的数据放在哪个hash槽,这样在节点增加或删除的时候,只要通过改变一部分hash槽的归属就可以实现

请求重定向

作为一个去中心化的东西,自然没有一个路由器专门接收请求根据slot扔给对应的节点,无论Redis 的客户端访问集群中的哪个节点都可以路由到对应的节点上

  1. 客户端算了一下觉得我应该访问的是节点1(比较智能的客户端本地会存一个slot到节点的map)
  2. 对应的slot已经从节点一移走了,但是节点1的clusterNode里存了每个节点和槽对应的关系,所以返回一个MOVED,告诉客户端应该去节点2
  3. 对应的slot正好在从节点1挪到节点2,节点1会返回一个ASK和节点2的地址,客户端接到返回之后去问节点2数据是否在,节点2返回是否

节点间通信

MEET

也就是建立集群的握手

PING PONG

ping:

用于交换节点的元数据。每个节点每秒会向集群中其他节点发送 ping 消息,消息中封装了自身节点状态还有其他部分节点的状态数据,也包括自身所管理的槽信息等等。

这里的部分节点至少包含 3 个其它节点的信息,最多包含 (总节点数 - 2)个其它节点的信息。

pong:

meet和ping协议的响应,同样包含节点状态还有其他部分节点的信息

Gossip协议

看了一眼机制也挺眼熟的(怎么感觉以前学通信学过),通过ping pong每个节点可以交换部分集群节点的数据,如果每个节点都定时挑几个节点去交换一下数据,最后就会获取整个集群的数据。而不是每个节点都要和每个节点建立通信,这样集群量大的时候通信压力也太大了

缺点大概就是信息会有滞后吧

节点多的话虽然集群通信压力可能小,但是信息滞后会导致重定向次数和概率变高,最后压力还是会大

集群扩容和收缩

其他都不重要(才没有懒得去仔细研究呢),主要是slot配置和slot内数据在节点的移动

  1. 扩容之后,需要有老的节点对新节点发cluster meet,让他加入新节点

图是csdn上抄的,https://blog.csdn.net/a745233700/article/details/112691126,看起来画的很清楚

节点故障下线和恢复

和sentinel一样,分为主观下线和客观下线

当故障节点下线后,如果是持有槽的主节点则需要在其从节点中找出一个替换它,从而保证高可用。然后就回到了前面的sentinel模式

备注:如果集群中某个节点的master和slave节点都宕机了,那么集群就会进入fail状态,因为集群的slot映射不完整。如果集群超过半数以上的master挂掉,无论是否有slave,集群都会进入fail状态。

总结

简单来说,主从复制利用读写分离解决的是吞吐量的问题,sentinel模式在其基础上解决了节点宕机导致服务不可用的问题,而集群在此基础上利用分布式存储解决了动态缩扩容的问题

redis学习笔记三:单机数据库

Posted on 2023-04-11 | In redis

redisDB

简介

1
2
3
4
5
6
7
8
9
10
typedef struct redisDb {

// ...

// 数据库键空间,保存着数据库中的所有键值对
dict *dict;

// ...

} redisDb;

里面的字典结构被称为 key space

好像就没什么好讲的了毕竟dict结构前面夜看到过emmmm,放张图吧

读写时的维护操作

  • 在读取一个键之后(读操作和写操作都要对键进行读取), 服务器会根据键是否存在, 以此来更新服务器的键空间命中(hit)次数或键空间不命中(miss)次数, 这两个值可以在 INFO stats 命令的 keyspace_hits 属性和 keyspace_misses 属性中查看。
  • 在读取一个键之后, 服务器会更新键的 LRU (最后一次使用)时间, 这个值可以用于计算键的闲置时间, 使用命令 OBJECT idletime 命令可以查看键 key 的闲置时间。

lru这个对象里提到过,可以计算闲置时间也可以用于一些算法的内存回收,hit数的作用则尚且不明确

  • 如果有客户端使用 WATCH 命令监视了某个键, 那么服务器在对被监视的键进行修改之后, 会将这个键标记为脏(dirty), 从而让事务程序注意到这个键已经被修改过, 《事务》一章会详细说明这一点。
  • 服务器每次修改一个键之后, 都会对脏(dirty)键计数器的值增一, 这个计数器会触发服务器的持久化以及复制操作执行, 《RDB 持久化》、《AOF 持久化》和《复制》这三章都会说到这一点。
  • 如果服务器开启了数据库通知功能, 那么在对键进行修改之后, 服务器将按配置发送相应的数据库通知, 本章稍后讨论数据库通知功能的实现时会详细说明这一点。

这三个后面回提到的东西先记下,等提到再callback吧

持久化

一切的开始之前先讲一下redis持久化是个什么东西(也是四处看博客的总结而已)

像我们的mysql这种db是不需要持久化的,它本来就是存储在硬盘里的数据,经过关机重启之类的操作不会丢也不会怎么滴

但是redis是一种内存的存储,内存这种东西在重启之后会直接清空,在服务器进程结束重新启动之后需要持久化来恢复原来的redis存储。方案有两种,一个是RDB,一个是AOF

RDB

RDB就是将某一时刻的数据集,用一种非常紧凑的格式的文件,保存下来(有点像快照)

但是很明显,如果没执行一次写操作就同步rdb,那效率就过于离谱了,可以用于数据集的备份甚至是版本控制

上面是一个RDB文件的结构,下面是里面的一个数据库的结构

SELECTDB 常量的长度为 1 字节, 当读入程序遇到这个值的时候, 它知道接下来要读入的将是一个数据库号码

下面是key_value_pairs的结构

  1. 如果没有过期时间的话前两格没有,直接从类型开始
  2. TYPE 记录了 value 的类型, 长度为 1 字节,也就是上面对象里面对象类型或者底层编码,比如REDIS_RDB_TYPE_STRING

键值对value结构

https://www.w3cschool.cn/hdclil/yohs7ozt.html

啊一个一个写感觉也没有必要,里面保存的其实就是前面讲到的对象结构,要用的或者关心的时候回来看一眼吧,包括具体的保存结构,压缩方案之类的

TBD

RDB的产生和update逻辑是什么样子的?

AOF

append only file

AOF文件会把所有的写入动作都存进一个文件,并在服务器启动时,通过重新执行这些命令来还原数据集。

命令追加

就是客户端每做一个会改变存储的操作:比如set或者update,就会追加一个操作到缓冲区

文件写入

redis的eventloop,在每次接收客户端请求进行操作之后,都会考虑是否将aof缓冲区(aof_buf)的文件写入和保存到aof文件里

flushAppendOnlyFile 函数的行为由服务器配置的 appendfsync 选项的值来决定

appendfsync 选项的值 flushAppendOnlyFile 函数的行为
always 将 aof_buf 缓冲区中的所有内容写入并同步到 AOF 文件。
everysec 将 aof_buf 缓冲区中的所有内容写入到 AOF 文件, 如果上次同步 AOF 文件的时间距离现在超过一秒钟, 那么再次对 AOF 文件进行同步, 并且这个同步操作是由一个线程专门负责执行的。
no 将 aof_buf 缓冲区中的所有内容写入到 AOF 文件, 但并不对 AOF 文件进行同步, 何时同步由操作系统来决定。

文件写入数据丢失问题

aof操作是写在内存缓冲区的,那么如果突然宕机了又改怎么办

不同的函数配置丢失的数据就不一样,always最多丢失的就是一个eventloop内的缓冲区指令。everysec丢失的是一秒的指令,no则会丢失上次同步之后写入的所有的数据

但是相反,always因为每次写入之后都要同步,所以写入效率是最低的,no无需执行同步操作,所以写入效率自然也快很多

aof可以很好地解决数据同步和避免数据丢失的问题,但是同时aof的文件大小会随着redis运行的时长变得非常非常非常离谱=》怎样把rdb和aof组合?

事件

背景

io多路复用:i/o multiplexing

简单一点来说就和我们go channel的select逻辑很像,只不过从同时监听多个channel变成了同时监听多个fd,不需要一个线程一个fd这么消耗资源还不好维护

select 和 epoll的区别:后面再说吧不属于redis范畴

Reactor模式

用多路复用的规则去监听事件,监听到了之后就扔给handler去处理事件(简单来说就是这样

文件事件

发现了redis设计与实现这本书一个诡异的问题,经常是上来连这个东西是什么干什么用的都不解释就哐哐哐一顿原理输出

文件事件是redis服务端通过socket连接客户端,并且处理客户端操作的抽象

  1. 每个套接字执行连接,读取,写入,关闭等操作
  2. 每个操作产生一个文件事件,被io多路复用程序写入队列扔给分派器
    1. 因为io多路复用本质是个block逻辑,所以只有一个套接字内的文件事件都结束了,多路复用程序才会执行下一次event loop并且把下一个套接字的事件扔进队列
  3. 分派器会根据文件事件类型调度不同的处理器去处理

多路复用的函数都是套了linux epoll,select之类的 函数

Redis 为文件事件编写了多个处理器, 这些事件处理器分别用于实现不同的网络通讯需求, 比如说:

  • 为了对连接服务器的各个客户端进行应答, 服务器要为监听套接字关联连接应答处理器。
  • 为了接收客户端传来的命令请求, 服务器要为客户端套接字关联命令请求处理器。
  • 为了向客户端返回命令的执行结果, 服务器要为客户端套接字关联命令回复处理器。
  • 当主服务器和从服务器进行复制操作时, 主从服务器都需要关联特别为复制功能编写的复制处理器。
  • 等等。

在这些事件处理器里面, 服务器最常用的要数与客户端进行通信的连接应答处理器、 命令请求处理器和命令回复处理器。

AE_READABLE 事件和 AE_WRITABLE 事件的概念是说,这个socket本身是可读的还是可写的,比如建立连接,读取指令就是readable事件,向socket写入返回结果,就是writable事件(感觉书里把它写复杂了

备注:

I/O 多路复用程序允许服务器同时监听套接字的 AE_READABLE 事件和 AE_WRITABLE 事件, 如果一个套接字同时产生了这两种事件, 那么文件事件分派器会优先处理 AE_READABLE 事件, 等到 AE_READABLE 事件处理完之后, 才处理 AE_WRITABLE 事件。

这也就是说, 如果一个套接字又可读又可写的话, 那么服务器将先读套接字, 后写套接字。

时间事件

时间事件是redis内部的另一种事件类型,时间事件可以实现定时或周期两种逻辑

服务器将所有时间事件都放在一个无序链表中,每当时间事件执行器运行时,它就遍历整个链表,查找所有已到达的时间事件,并调用相应的事件处理器。

上面讲到的处理持久化aof的逻辑,就是在一个叫serverCron的函数里执行的,而有一个时间事件就是执行serverCron函数。

持续运行的Redis服务器需要定期对自身的资源和状态进行检查和调整,从而确保服务器可以长期、稳定地运行,这些定期操作由redis.c/serverCron函数负责执行,它的主要工作包括:

更新服务器的各类统计信息,比如时间、内存占用、数据库占用情况等

清理数据库中的过期键值对。

关闭和清理连接失效的客户端

尝试进行AOF或RDB持久化操作

如果服务器是主服务器,那么对从服务器进行定期同步

如果处于集群模式,对集群进行定期同步和连接测

客户端

这里的客户端指的是在服务器内,每个连接的客户端都会被用redisClient结构体保存下来

客户端属性

1
2
> CLIENT list
id=-1498257551710864375 addr=9.137.217.100:51572 fd=96 name= cmd=ping age=99963 idle=21 proxy=3062a189cc80bde541b30efc7a5721c37c89df49
  • fd: 套接字描述符,普通客户端fd>0,伪客户端fd=-1,伪客户端处理的命令请求来源于 AOF 文件或者 Lua 脚本, 而不是网络, 所以这种客户端不需要套接字连接, 自然也不需要记录套接字描述符。
  • name:客户端名称,正常情况下客户端名称都是么有的,使用 CLIENT_SETNAME 命令可以为客户端设置一个名字
  • flags:客户端的身份和状态-具体以后再看文档吧
  • age:客户端的连接时间(s)

客户端命令执行

  1. 客户端发出的命令会被放到redisClient的输入缓冲区 querybuf,querybuf的大小是动态的,但是如果超过1个G客户端就会自动断开,but redis-cli 终端默认只能输入 4095 个字符,用脚手架倒是不会超过就是了
  2. 分析存在缓冲区的命令,并且把命令解析之后存在 argc 和 argv里面,其中argc负责记录argv的长度,argv[0] 默认是command,后面是按顺序输入的参数(这倒就是普通的读输入逻辑

  1. 根据argv[0] 的command,在命令表找到对应的redisCommand,并且把客户端的cmd指针指向对应的redisCommand,之后客户端就可以执行command命令

  1. 执行命令所得的命令回复会被保存在客户端状态的输出缓冲区里面,客户端的缓冲区有两种,固定大小缓冲区和可变大小缓冲区。简单来说就是固定大小缓冲区是自带默认的,大小默认是16kb。如果返回很大固定大小缓冲区塞不下就会用到可变大小缓冲区,可变大小缓冲区的数据结构是个链表

服务器

这个是抄书的一个,上面讲过的客户端发送命令到收到ok的大致过程:(大家都知道就是了

那么从客户端发送 SET KEY VALUE 命令到获得回复 OK 期间, 客户端和服务器共需要执行以下操作:

  1. 客户端向服务器发送命令请求 SET KEY VALUE 。
  2. 服务器接收并处理客户端发来的命令请求 SET KEY VALUE , 在数据库中进行设置操作, 并产生命令回复 OK 。
  3. 服务器将命令回复 OK 发送给客户端。
  4. 客户端接收服务器返回的命令回复 OK , 并将这个回复打印给用户观看。

中间的细节:不想写感觉除了上一节写的东西之外也就是多讲了一些细节,比如参数校验,身份验证还有一些执行之后的后续工作,有需要再看吧

总结

如果说上一个章节是介绍了redis在内存中保存的数据结构的话,这个章节介绍的就是redis服务器是怎么运行的,包括持久化存储逻辑,事件处理,客户端连接,处理客户端请求等(感觉这本书意外地写得挺清楚

redis学习笔记二:对象

Posted on 2022-12-07 | Edited on 2023-04-11 | In redis

前面介绍的所有的数据类型都是属于redis的底层数据结构,对象可以理解为是对底层数据结构的封装。

Redis 使用对象来表示数据库中的键和值,每次当我们在 Redis 的数据库中新创建一个键值对时, 我们至少会创建两个对象, 一个对象用作键值对的键(键对象), 另一个对象用作键值对的值(值对象)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
typedef struct redisObject {

// 类型
unsigned type:4;

// 编码
unsigned encoding:4;

// 指向底层实现数据结构的指针
void *ptr;

// ...

} robj;

表:对象的类型

类型常量 对象的名称
REDIS_STRING 字符串对象(底层就是简单的字符串结构)
REDIS_LIST 列表对象
REDIS_HASH 哈希对象
REDIS_SET 集合对象
REDIS_ZSET 有序集合对象

编码

与此同时,对象只是一种数据类型对外的表征展示,同一个类型对象的底层数据结构可以是不一样的,这个底层结构的就是用编码encoding 来记录的。

对象的编码

编码常量 编码所对应的底层数据结构
REDIS_ENCODING_INT long 类型的整数
REDIS_ENCODING_EMBSTR embstr 编码的简单动态字符串
REDIS_ENCODING_RAW 简单动态字符串
REDIS_ENCODING_HT 字典
REDIS_ENCODING_LINKEDLIST 双端链表
REDIS_ENCODING_ZIPLIST 压缩列表
REDIS_ENCODING_INTSET 整数集合
REDIS_ENCODING_SKIPLIST 跳跃表和字典

每种类型的对象都至少使用了两种不同的编码, 下面的表列出了每种类型的对象可以使用的编码。

不同类型和编码的对象

类型 编码 对象
REDIS_STRING REDIS_ENCODING_INT 使用整数值实现的字符串对象。
REDIS_STRING REDIS_ENCODING_EMBSTR 使用 embstr 编码的简单动态字符串实现的字符串对象。
REDIS_STRING REDIS_ENCODING_RAW 使用简单动态字符串实现的字符串对象。
REDIS_LIST REDIS_ENCODING_ZIPLIST 使用压缩列表实现的列表对象。
REDIS_LIST REDIS_ENCODING_LINKEDLIST 使用双端链表实现的列表对象。
REDIS_HASH REDIS_ENCODING_ZIPLIST 使用压缩列表实现的哈希对象。
REDIS_HASH REDIS_ENCODING_HT 使用字典实现的哈希对象。
REDIS_SET REDIS_ENCODING_INTSET 使用整数集合实现的集合对象。
REDIS_SET REDIS_ENCODING_HT 使用字典实现的集合对象。
REDIS_ZSET REDIS_ENCODING_ZIPLIST 使用压缩列表实现的有序集合对象。
REDIS_ZSET REDIS_ENCODING_SKIPLIST 使用跳跃表和字典实现的有序集合对象。

Redis 可以根据不同的使用场景来为一个对象设置不同的编码, 从而优化对象在某一场景下的效率。

就比如上面介绍压缩列表时提到的,在数据量不大的背景下(查找效率影响不大),为了优化存储空间就会被处理成压缩列表。

字符串对象

虽然名字叫字符串对象,但是实际上是简单赋值对象,毕竟字符串对象的编码可以是int 、 raw 或者 embstr。

浮点数也就是 long double 在redis底层也是存的字符串(也就是后两个),读写的时候都分别转一下。

int

顺便来说一下对象结构的应用,如果执行了

1
redis> SET KEY 10000

那么字符串对象会将整数值保存在字符串对象结构的 ptr属性里面(将 void* 转换成 long ), 并将字符串对象的编码设置为 int 。当然这里的long看起来不是redis自定义的数据结构而是单纯的long类型数据结构

raw

如果字符串对象保存的是一个字符串值, 并且这个字符串值的长度大于 39 字节, 那么字符串对象将使用一个简单动态字符串(SDS)来保存这个字符串值, 并将对象的编码设置为 raw 。

看一下这下这个对象长啥样

嘛感觉还是非常生动形象的。

embstr

如果字符串对象保存的是一个字符串值, 并且这个字符串值的长度小于等于 39 字节, 那么字符串对象将使用 embstr 编码的方式来保存这个字符串值。

本质上来说embstr也是和前面的压缩列表一样利用连续地址空间节约内存的东西(看得出对redis来说内存有多珍贵了)

embstr和raw的区别:

raw分配内存的时候,分别初始化redisObj和sdshdr,然后分配两块地址空间

而embstr分配内存时,一口气初始化redisObj和sdshdr,初始化成一块连续的地址空间。

嘛至于文档里说的调用和释放少掉用一次函数这种事情并不是重点啦

(这段是抄的但是讲的很有道理)至于为什么是39,这个讲起来就比较复杂了,我就慢点说。
embstr是一块连续的内存区域,由redisObject和sdshdr组成。其中redisObject占16个字节,当buf内的字符串长度是39时,sdshdr的大小为8+39+1=48,那一个字节是’\0’。加起来刚好64。是不是发现了什么?

编码转换

  1. int -> raw 当本来被设置为int的value变成字符串之后,就要进行编码转换
  2. embstr -> raw embstr是个只读的,所以只要对embstr进行了编辑,就会变成raw

同时 embstr 是只读的,也就是说其他数据类型不能转embstr,

字符串应该是使用率最高的东西了(我觉得)

列表对象

ziplist 和 linkedlist

一共有 ziplist 和 linkedlist两种

怎么看ziplist都是压缩列表的样子

linkedlist就是列表,在redis里是个双端列表的数据结构

不如在这里把文档里的例子都给一下,正好方便理解前面的东西(其实就是抄书)

Key: numbers value : 1 “three” 5

压缩列表

链表

编码转换

当列表对象可以同时满足以下两个条件时, 列表对象使用 ziplist 编码:

  1. 列表对象保存的所有字符串元素的长度都小于 64 字节;
  2. 列表对象保存的元素数量小于 512 个;

不能满足这两个条件的列表对象需要使用 linkedlist 编码。

这俩是可以在配置文件里配置的

哈希对象

ziplist 和 hashtable

先说hashtable吧,看起来就无比正常,底层是前面的字典结构,这里书上是这么写的:

  • 字典的每个键都是一个字符串对象, 对象中保存了键值对的键;
  • 字典的每个值都是一个字符串对象, 对象中保存了键值对的值。

根据前面看过的字典可以知道,dict->dictht->[]dictEntry,一个dictEntry里保存一个键值对,这个键是个指针,值可以是整数也可以是指针,书上的描述的意思应该就是每个键和值都指向了一个字符串对象的意思,嗯应该是的。

ziplist也就是压缩列表,数组的压缩列表很好理解嘛一个一个压起来,每个节点存个长度之类的东西就行。但是哈希有键值对的概念,所以就是先放key再放value的顺序压起来

  • 保存了同一键值对的两个节点总是紧挨在一起, 保存键的节点在前, 保存值的节点在后;
  • 先添加到哈希对象中的键值对会被放在压缩列表的表头方向, 而后来添加到哈希对象中的键值对会被放在压缩列表的表尾方向。

找肯定也是遍历找嘛这个不用怀疑,就不会和字典一样可以根据hash算法来加快找的速度

编码转换

用ziplist的条件和前面的数组一样,只要不满足条件就会从ziplist变成hashtable

集合对象

intset 和 hashtable

intset也就是前面的整数集合,这个比较好理解吧就是一堆整数集合。

hashtable就很神奇了,键值对里只有key是作为存储的,值则被置为null。不过仔细想想也很好理解,利用hashtable作为字段在不在set里的查找逻辑嘛,平时自己不也是这么写的

编码转换

当集合对象可以同时满足以下两个条件时, 对象使用 intset 编码:

  1. 集合对象保存的所有元素都是整数值;
  2. 集合对象保存的元素数量不超过 512 个;

不能满足这两个条件的集合对象需要使用 hashtable 编码。

当然了那个512也是个配置

有序集合

ziplist 和 skiplist

跳表在前面看数据结构的时候就提过是用来处理有序集合的,可以利用跳表提高查找效率。

实际上一个skiplist包含一个跳表和一个字典

skiplist 编码的有序集合对象使用 zset 结构作为底层实现, 一个 zset 结构同时包含一个字典和一个跳跃表:

1
2
3
4
5
6
7
typedef struct zset {

zskiplist *zsl;

dict *dict;

} zset;

跳表本身没什么好说的,改存啥就存啥

字典就比较神奇了,key:每个节点的值,value:每个节点的score

值得一提的是, 虽然 zset 结构同时使用跳跃表和字典来保存有序集合元素, 但这两种数据结构都会通过指针来共享相同元素的成员和分值, 所以同时使用跳跃表和字典来保存集合元素不会产生任何重复成员或者分值, 也不会因此而浪费额外的内存。

确实值得一提,不然肯定有人(比如我)会问在redis这种内存大过天的地方怎么会舍得分两份内存出来照顾有序集合的

但是为什么要用两个数据结构存有序集合?

ZRANGE 这种按顺序拿值的函数,那用跳表就ok了 O(nlogn)

ZSCORE 是需要根据值来获取score O(1)

嘛这样就可以理解了

压缩列表也没啥好说的,就是比较小的时候挨个排一排存起来,第一个是值第二个是score

编码转换

当有序集合对象可以同时满足以下两个条件时, 对象使用 ziplist 编码:

  1. 有序集合保存的元素数量小于 128 个;
  2. 有序集合保存的所有元素成员的长度都小于 64 字节;

类型检查

背景,redis很多操作都是针对特定对象执行的,比如set对字符串对象,hset则是对哈希对象执行,类型检查就是为了确保特定的类型执行特定的指令。

类型特定命令所进行的类型检查是通过 redisObject 结构的 type 属性来实现的:

  • 在执行一个类型特定命令之前, 服务器会先检查输入数据库键的值对象是否为执行命令所需的类型, 如果是的话, 服务器就对键执行指定的命令;
  • 否则, 服务器将拒绝执行命令, 并向客户端返回一个类型错误。

多态

其实就是不同编码支持同一个函数的情况下,这个指令就被认为支持多态,比如linkedList和zipList

内存回收

redis利用引用计数来实现内存回收。

1
2
3
4
5
6
7
8
9
10
typedef struct redisObject {

// ...

// 引用计数
int refcount;

// ...

} robj;

对象的引用计数信息会随着对象的使用状态而不断变化:

  • 在创建一个新对象时, 引用计数的值会被初始化为 1 ;
  • 当对象被一个新程序使用时, 它的引用计数值会被增一;
  • 当对象不再被一个程序使用时, 它的引用计数值会被减一;
  • 当对象的引用计数值变为 0 时, 对象所占用的内存会被释放。

对象共享

可是为什么这个对象会被多个程序引用呢,这就涉及到了对象共享逻辑。

比如你创建一个值为10的字符串对象,key a和key b对应的值都是10的时候,他们会指向同一个字符串对象:10。

说白了也就是为了节约内存。

目前来说, Redis 会在初始化服务器时, 创建一万个字符串对象, 这些对象包含了从 0 到 9999 的所有整数值, 当服务器需要用到值为 0到 9999 的字符串对象时, 服务器就会使用这些共享对象, 而不是新创建对象。

当然这个数量也可以通过配置来修改

可以用 OBJECT REFCOUNT 来查看改对象引用量

比较有意思的是,目前Redis 只对包含整数值的字符串对象进行共享

简单来说原因就是时间复杂度->cpu耗时,整数匹配的时间复杂度为O(1) 而 字符串匹配的时间复杂度为 O(n)

而如果是一个哈希对象共享的话则会更加复杂,要判断键是否一致,再判断每个键下面的值是否一致,书上算出来的是O(n^2),但是我总觉得是O(kn)

而且字符串被重复利用比例和整数相比要低很多

空转时长

用到的是redisobject里面的lru 属性

1
2
3
4
5
6
7
8
9
typedef struct redisObject {

// ...

unsigned lru:22;

// ...

} robj;

lru这个名字一出来就是老熟人了,lru记录了对象最后一次被命令程序访问的时间。用 OBJECT IDLETIME 命令可以打印出该对象的空转时长,也就是当前时间-lru

键的空转时长还有另外一项作用: 如果服务器打开了 maxmemory 选项, 并且服务器用于回收内存的算法为 volatile-lru 或者 allkeys-lru , 那么当服务器占用的内存数超过了 maxmemory 选项所设置的上限值时, 空转时长较高的那部分键会优先被服务器释放, 从而回收内存。

12…4
Nina

Nina

37 posts
19 categories
40 tags
© 2024 Nina
Powered by Hexo v3.9.0
|
Theme — NexT.Pisces v6.3.0