微信悬浮二维码
微信扫一扫
关注更多Golang内幕
go微服务系列-Go语言社区

go微服务系列

版权声明:本文来源CSDN,博主原创文章,遵循 CC 4.0 by-sa 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://blog.csdn.net/mergerly/article/details/84840606

hey,好久没有发博客了,最近换了新东家有点忙(其实是忙着学习,不是工作,毕竟为了money放弃了较擅长的java投向了go),很多要学的;这是在新公司接触微服务框架(micro框架)后的总结,在未来,我将会较深入的研究go和微服务,之后也会陆陆续续的发布一些文章,如果想直接查阅源码或者通过demo学习的,可以访问ricoder_demo

如何编写一个微服务?这里用的是go的微服务框架go micro,具体的情况可以查阅:http://btfak.com/%E5%BE%AE%E6%9C%8D%E5%8A%A1/2016/03/28/go-micro/

这里给出的是开发一个微服务的步骤:

1、书写proto文件,定义函数等
具体实现:

syntax = "proto3";

package pb;


service UserService {
    //增
    rpc InsertUser (InsertUserReq) returns (InsertUserRep){}
    //删
    rpc DeletetUser (DeletetUserReq) returns (DeletetUserRep){}
    //查
    rpc SelectUser(SelectUserReq)returns (SelectUserRep){}
    //改
    rpc ModifyUser(ModifyUserReq)returns (ModifyUserRep){}
}

message User{
    int32 id = 1 ;
    string name = 2;
    string Address = 3;
    string Phone = 4;
}

message ModifyUserReq {
    int32 id = 1 ;
    string name = 2;
    string Address = 3;
    string Phone = 4;
}

message ModifyUserRep {
}

message SelectUserReq {
    int32 id = 1 ;
}

message SelectUserRep {
    User users = 1;
}

message DeletetUserReq {
    int32 id = 1 ;
}

message DeletetUserRep {
}

message InsertUserReq {
    int32 id = 1 ;
    string name = 2;
    string Address = 3;
    string Phone = 4;
}

message InsertUserRep {
    int32 id = 1 ;
    string name = 2;
    string Address = 3;
    string Phone = 4;
}

2、采用代码生成工具生成user.pb.go文件,生成协议

具体可以查看 https://github.com/micro/go-micro ,这里我自己写了个脚本文件 build_proto.sh ,自动将指定文件夹下的proto文件生成相应的协议,具体代码如下:

#!/usr/bin/env bash
protoc --proto_path=./proto --go_out=plugins=micro:./src/share/pb ./proto/*.proto

可以对代码进行自定义修改 … … …

运行 build_proto.sh 文件后可以看到在指定文件夹下生成了相应的 user.pb.go 文件,在我这里如:

Screenshot from 2017-10-11 17-09-09.png

3、书写一个handler实现user.pb.go定义的接口

具体实现:
1)先创建一个user.go文件
2)定义一个结构体,命名为UserHandler,实现user.proto文件所有定义的service,在这里要注意一点,即使是暂时没有实现好业务,也有给个空实现,代码如下:

package handler

import(
    "mewe_job/GoMicroDemo/src/share/pb"
    "golang.org/x/net/context"
)

type UserHandler struct {

}

// new一个UserHandler
func NewUserHandler() *UserHandler{
    return &UserHandler{}
}

// 增
func (c *UserHandler) InsertUser(ctx context.Context, req * pb.InsertUserReq,rsp *pb.InsertUserRep)error {

    return nil
}

// 删
func (c *UserHandler) DeletetUser(ctx context.Context, req * pb.DeletetUserReq,rsp *pb.DeletetUserRep)error {

    return nil
}

// 查
func (c *UserHandler) SelectUser(ctx context.Context, req * pb.SelectUserReq,rsp *pb.SelectUserRep)error {

    return nil
}

//改
func (c *UserHandler) ModifyUser(ctx context.Context, req * pb.ModifyUserReq,rsp *pb.ModifyUserRep)error {

    return nil
}

4、将handler注册进微服务,这一步在main中实现
具体实现:

package main

import (
    "github.com/micro/cli"
    "mewe_job/GoMicroDemo/src/share/pb"
    "github.com/micro/go-micro/server"
    "mewe_job/GoMicroDemo/src/user-srv/handler"
    "github.com/micro/go-micro"
    "log"
    "mewe_job/GoMicroDemo/src/user-srv/db"
    "mewe_job/GoMicroDemo/src/share/config"
)

func main() {


    // 创建Service,并定义一些参数
    service := micro.NewService(
        micro.Name("go.micro.srv.user"),
        micro.Version("latest"),
    )

    // 定义Service动作操作
    service.Init(
        micro.Action(func(c *cli.Context) {
            log.Println("micro.Action test ...")
            // 先注册db
            db.Init(config.MysqlDSN)
            pb.RegisterUserServiceHandler(service.Server(), handler.NewUserHandler(), server.InternalHandler(true))
        }),
        micro.AfterStop(func() error {
            log.Println("micro.AfterStop test ...")
            return nil
        }),
        micro.AfterStart(func() error {
            log.Println("micro.AfterStart test ...")
            return nil
        }),
    )

    log.Println("启动user-srv服务 ...")

    //启动service
    if err := service.Run(); err != nil {
        log.Panic("user-srv服务启动失败 ...")
    }
}

这段代码主要的点有:

  • 创建service,通过以下代码可以初始化一个名叫 go.micro.srv.user 的微服务

      // 创建Service,并定义一些参数
      service := micro.NewService(
          micro.Name("go.micro.srv.user"),
          micro.Version("latest"),
      )
    
  • 注册db连接和给go.micro.srv.user这个微服务绑定handler,虽然目前我还没有在db中定义db层的操作

    db.Init(config.MysqlDSN)
    pb.RegisterUserServiceHandler(service.Server(), handler.NewUserHandler(), server.InternalHandler(true))

  • 启动service,通过Run开启

    if err := service.Run(); err != nil {
    log.Panic(“user-srv服务启动失败 …”)
    }

5、现在就可以通过 go run main.go --registry=mdns 启动该服务了,之所以携带 --registry=mdns 是因为我本地ubuntu系统没有安装consul实现服务发现,所以就采用了gomicro官方推荐的方法。

Screenshot from 2017-10-11 17-42-03.png

6、到这一步客户端还无法访问到服务,需要做些处理,我这里是加了个web服务,再将客户端的请求进行转发,main函数实现如下:

func main() {

/*  方案一:
    mux := http.NewServeMux()
    mux.HandleFunc("/", handleRPC)
    log.Println("Listen on :8082")
    http.ListenAndServe(":8082", mux)*/
/*  方案二  */
    service := web.NewService(
        web.Name(config.ServicePrefix+".web"),
    )

    service.HandleFunc("/", handleRPC)

    if err := service.Init(); err != nil {
        log.Fatal(err)
    }

    if err := service.Run(); err != nil {
        log.Fatal(err)
    }
}

这里主要的函数是handleRPC这个函数,由于代码量偏多,具体实现可以查看源码。这里如果使用consul实现了服务发现,也可以通过方案一进行实现,这样的话web服务的端口还是固定的。

7、开启web服务

$ go run web.go --registry=mdns
Listening on [::]:36859

8、到这一步客户端就可以通过web服务端口和接口以及参数访问user这个微服务了,访问链接:(这里安利postman,一个谷歌的插件,超级好用 … )

http://127.0.0.1:36859/user/selectUser

tip:该项目的源码(包含数据库的增删查改的demo)可以查看 源代码

这是使用gomicro开发微服务系列的第二篇,在上一篇中我只是使用了user-srv和web-srv实现了一个demo,在这里我将实用consul实现服务发现。如果想直接查阅源码或者通过demo学习的,可以访问ricoder_demo

如何编写一个微服务?这里用的是go的微服务框架go micro,具体的情况可以查阅:http://btfak.com/%E5%BE%AE%E6%9C%8D%E5%8A%A1/2016/03/28/go-micro/

一、如何安装consul

我的开发系统采用的是ubunt16.04,这里就给出ubuntu下安装consul的步骤:

$ wget https://releases.hashicorp.com/consul/0.7.2/consul_0.7.2_linux_amd64.zip
$ sudo apt-get install unzip

$ ls

$ unzip consul_0.7.2_linux_amd64.zip
$ sudo mv consul /usr/local/bin/consul

$ wget https://releases.hashicorp.com/consul/0.7.2/consul_0.7.2_web_ui.zip
$ unzip consul_0.7.2_web_ui.zip
$ mkdir -p /usr/share/consul
$ mv dist /usr/share/consul/ui

Consul 压缩包地址:https://www.consul.io/downloads.html

验证安装是否成功的方法:

$ consul
Usage: consul [--version] [--help] <command> [<args>]

Available commands are:
    agent          Runs a Consul agent
    catalog        Interact with the catalog
    event          Fire a new event
    exec           Executes a command on Consul nodes
    force-leave    Forces a member of the cluster to enter the "left" state
    info           Provides debugging information for operators.
    join           Tell Consul agent to join cluster
    keygen         Generates a new encryption key
    keyring        Manages gossip layer encryption keys
    kv             Interact with the key-value store
    leave          Gracefully leaves the Consul cluster and shuts down
    lock           Execute a command holding a lock
    maint          Controls node or service maintenance mode
    members        Lists the members of a Consul cluster
    monitor        Stream logs from a Consul agent
    operator       Provides cluster-level tools for Consul operators
    reload         Triggers the agent to reload configuration files
    rtt            Estimates network round trip time between nodes
    snapshot       Saves, restores and inspects snapshots of Consul server state
    validate       Validate config files/directories
    version        Prints the Consul version
    watch          Watch for changes in Consul

启动consul服务的方法:

$ consul agent -dev
==> Starting Consul agent...
==> Consul agent running!
           Version: 'v0.9.3'
           Node ID: '199ee0e9-db61-f789-b22a-b6b472f63fbe'
         Node name: 'ricoder'
        Datacenter: 'dc1' (Segment: '<all>')
            Server: true (Bootstrap: false)
       Client Addr: 127.0.0.1 (HTTP: 8500, HTTPS: -1, DNS: 8600)
      Cluster Addr: 127.0.0.1 (LAN: 8301, WAN: 8302)

优雅的停止服务的方法:

命令:CTRL+C

其他命令:

  • consul members:查看集群成员
  • consul info:查看当前服务器的状况
  • consul leave:退出当前服务集群

成功开启consul服务后可以登录后台访问地址:http://localhost:8500,如下:

Screenshot from 2017-10-14 13-35-58.png

二、api-srv的开发,实现动态路由

根据官方对api-srv的介绍:The micro api is an API gateway for microservices. Use the API gateway pattern to provide a single entry point for your services. The micro api serves HTTP and dynamically routes to the appropriate backend service.
粗略翻译的意思就是:api-srv是微服务的网关,使用API网关模式可以为我们的服务提供一个入口,api-srv提供HTTP服务,并动态路由到相应的后端服务。

步骤1:监听8082端口,并绑定handler处理http请求

    mux := http.NewServeMux()
    mux.HandleFunc("/", handleRPC)
    log.Println("Listen on :8082")
    http.ListenAndServe(":8082", mux)

步骤2:实现handler,并实现跨域处理

    if r.URL.Path == "/" {
        w.Write([]byte("ok,this is the server ..."))
        return
    }

    // 跨域处理
    if origin := r.Header.Get("Origin"); cors[origin] {
        w.Header().Set("Access-Control-Allow-Origin", origin)
    } else if len(origin) > 0 && cors["*"] {
        w.Header().Set("Access-Control-Allow-Origin", origin)
    }

    w.Header().Set("Access-Control-Allow-Methods", "POST, GET, OPTIONS")
    w.Header().Set("Access-Control-Allow-Headers", "Content-Type, Content-Length, Accept-Encoding, X-Token, X-Client")
    w.Header().Set("Access-Control-Allow-Credentials", "true")
    if r.Method == "OPTIONS" {
        return
    }

    if r.Method != "POST" {
        http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
        return
    }

步骤3:实现将url转换为service和method,这里我采用了pathToReceiver这个函数来处理

    p = path.Clean(p)
    p = strings.TrimPrefix(p, "/")
    parts := strings.Split(p, "/")

    // If we've got two or less parts
    // Use first part as service
    // Use all parts as method
    if len(parts) <= 2 {
        service := ns + strings.Join(parts[:len(parts)-1], ".")
        method := strings.Title(strings.Join(parts, "."))
        return service, method
    }

    // Treat /v[0-9]+ as versioning where we have 3 parts
    // /v1/foo/bar => service: v1.foo method: Foo.bar
    if len(parts) == 3 && versionRe.Match([]byte(parts[0])) {
        service := ns + strings.Join(parts[:len(parts)-1], ".")
        method := strings.Title(strings.Join(parts[len(parts)-2:], "."))
        return service, method
    }

    // Service is everything minus last two parts
    // Method is the last two parts
    service := ns + strings.Join(parts[:len(parts)-2], ".")
    method := strings.Title(strings.Join(parts[len(parts)-2:], "."))
    return service, method

http传进来的url是http://127.0.0.1:8082/user/userService/SelectUser,我在handler中通过以下方式调用后:

service, method := apid.PathToReceiver(config.Namespace, r.URL.Path)

service和method分别是:

2017/10/14 13:56:12 service:com.class.cinema.user
2017/10/14 13:56:12 method:UserService.SelectUser

注意:var config.Namespace = “com.class.cinema”

步骤4:封装request,调用服务

    br, _ := ioutil.ReadAll(r.Body)

    request := json.RawMessage(br)

    var response json.RawMessage
    req := (*cmd.DefaultOptions().Client).NewJsonRequest(service, method, &request)
    ctx := apid.RequestToContext(r)
    err := (*cmd.DefaultOptions().Client).Call(ctx, req, &response)

在这里Call就是调用相应服务的关键。

步骤5:对err进行相应的处理和返回调用结果

// make the call
    if err != nil {
        ce := microErrors.Parse(err.Error())
        switch ce.Code {
        case 0:
            // assuming it's totally screwed
            ce.Code = 500
            ce.Id = service
            ce.Status = http.StatusText(500)
            // ce.Detail = "error during request: " + ce.Detail
            w.WriteHeader(500)
        default:
            w.WriteHeader(int(ce.Code))
        }
        w.Write([]byte(ce.Error()))
        return
    }
    b, _ := response.MarshalJSON()
    w.Header().Set("Content-Length", strconv.Itoa(len(b)))
    w.Write(b)

通过对err的处理,在请求的method或者service不存在时,如:

Screenshot from 2017-10-14 14-05-28.png

会有相应的错误信息提示返回到客户端。

三、跑起服务,查看效果

步骤1:首先要先跑起consul服务发现机制,这样后期加入的服务才可以被检测到,如:

Screenshot from 2017-10-14 14-34-13.png

步骤2:跑起user-srv服务,如:

Screenshot from 2017-10-14 14-35-36.png

登录consul后台,查看服务是否被发现:

Screenshot from 2017-10-14 14-36-41.png

可以从中看到多了一个com.class.cinema.user这个服务

步骤3:通过postman访问user-srv服务

Screenshot from 2017-10-14 14-39-37.png

可以看到在Body处有数据显示出来了,再看看服务后台的日志输出

Screenshot from 2017-10-14 14-40-12.png

Screenshot from 2017-10-14 14-40-29.png

由上面两个图可以看出来,客户端的请求到达了api-srv,再通过api-srv到达了user-srv。

注意:此处的url的书写曾经遇见过一个bug,那就是我第一次书写成了 http://127.0.0.1:8082/user/SelectUser,导致出现这种异常:

Screenshot from 2017-10-14 14-44-35.png

在前两篇系列博文中,我已经实现了user-srv、web-srv、api-srv,在新的一篇博文中,我要讲解的是如何在项目中如何使用redis存储session。如果想直接查阅源码或者通过demo学习的,可以访问ricoder_demo

如何编写一个微服务?这里用的是go的微服务框架go micro,具体的情况可以查阅:http://btfak.com/%E5%BE%AE%E6%9C%8D%E5%8A%A1/2016/03/28/go-micro/

一、构建user-status.srv

1.1 构建UserStatus.proto

定义User的状态操作函数,部分源码如下:

syntax = "proto3";

package pb;

service UserStatus {
//通过uid获取session
rpc GetSessionByUID(GetSessionByUIDReq) returns (GetSessionByUIDRep) {}
//通过token获取session
rpc GetSessionByToken(GetSessionByTokenReq) returns (GetSessionByTokenRep) {}
//获取用户的长连接地址
rpc GetConnectorAddr(GetConnectorAddrReq) returns (GetConnectorAddrRep) {}
//更新用户长连接地址(用户建立长连接时调用)
rpc UpdateConnectorAddr(UpdateConnectorAddrReq) returns (UpdateConnectorAddrRep) {}
//构建session用户登录时调用,此接口会清除旧session
rpc NewSession(NewSessionReq) returns (NewSessionRep) {}
//移除session登出时会调用
rpc RemoveSession(RemoveSessionReq) returns (RemoveSessionRep) {}
//token续期
rpc RefreshSession(RefreshSessionReq) returns (RefreshSessionRep) {}
//更新用户长连接地址(用户建立长连接时调用)
rpc UserConnected(UserConnectedReq) returns (UserConnectedRep) {}
//删除用户的长连接地址(用户长连接断开时调用)
rpc UserDisonnected(UserDisonnectedReq) returns (UserDisonnectedRep) {}
//通过uid来移除session
rpc RemoveSessionByUID(RemoveSessionByUIDReq) returns (RemoveSessionByUIDRep) {}
//通过token找uid
rpc GetUserIDByToken(GetUserIDByTokenReq) returns (GetUserIDByTokenRep) {}
}
/*
还有一些定义,完整示例可以查看源码~
*/
1.2 运行脚本build_proto.sh自动构建userStatus.pb.go
$ bash ./build_proto.sh

这个build_proto.sh是我自己构建的一个脚本文件,运行之后会在/src/share/pb/文件夹下面生成一个userStatus.pb.go文件

1.3 构建handler,实现userStatus中的函数

我在src文件夹下面添加一个user-status-srv文件夹,并在里边添加一个handler文件夹和utils文件夹,一个存放handler文件,一个存放工具类函数,然后实现handler函数,源码如下:

package handler

import (
    //多个导入包,具体请查看源码
)
type UserStatusHandler struct {
    pool               *redis.Pool
    logger             *zap.Logger
    namespace          string
    sessionExpire int
    tokenExpire int
}

func NewUserStatusHandler(pool *redis.Pool) *UserStatusHandler {
    return &UserStatusHandler{
        pool: pool,
        sessionExpire: 15 * 86400,
        tokenExpire:   15 * 86400,
    }
}

//GetUserIDByToken GetUIDByToken
func (s *UserStatusHandler) GetUserIDByToken(ctx context.Context, req *pb.GetUserIDByTokenReq, rsp *pb.GetUserIDByTokenRep) error {
    return nil  
}
/*
还有其他函数的实现,完整示例可以查看源码~
*/

这里实现的函数全部先采用空实现,在后面会慢慢添加

1.4 实现main函数,启动service

源码如下:

package main

import (
    //多个导入包,具体查看完整源码
)

func main() {

    // 创建Service,并定义一些参数
    service := micro.NewService(
        micro.Name(config.Namespace+"userStatus"),
        micro.Version("latest"),
    )
    // 定义Service动作操作
    service.Init(
        micro.Action(func(c *cli.Context) {
            log.Println("micro.Action test ...")
            // 注册redis
            redisPool := share.NewRedisPool(3, 3, 1,300*time.Second,":6379","redis")
            // 先注册db
            db.Init(config.MysqlDSN)
            pb.RegisterUserStatusHandler(service.Server(), handler.NewUserStatusHandler(redisPool), server.InternalHandler(true))
        }),
        micro.AfterStop(func() error {
            log.Println("micro.AfterStop test ...")
            return nil
        }),
        micro.AfterStart(func() error {
            log.Println("micro.AfterStart test ...")
            return nil
        }),
    )

    log.Println("启动user-status-srv服务 ...")

    //启动service
    if err := service.Run(); err != nil {
        log.Panic("user-status-srv服务启动失败 ...")
    }
}

由源码可以看出,我在启动service之前先注册了redis、db以及绑定handler,再通过Run启动service。

1.5 查看consul

在浏览器打开 http://127.0.0.1:8500/ ,如果可以在页面中看到对应的srv,则说明service启动成功。如:

Screenshot from 2017-10-17 18-04-06.png

二、使用redis

在这一章节中,我将采用redis实现数据的存取。

2.1 新建一个redis.Pool

在main.go函数中,我使用 *_share.NewRedisPool(3, 3, 1,300_time.Second,":6379",“redis”) 得到了一个redisPool,NewRedisPool源码如下:

func NewRedisPool(maxIdle, maxActive , DBNum int, timeout time.Duration, addr , password string) *redis.Pool {

    return &redis.Pool{
        MaxActive:   maxActive,
        MaxIdle:     maxIdle,
        IdleTimeout: timeout,
        Wait:        true,
        Dial: func() (redis.Conn, error) {
            // return redis.DialURL(rawurl)
            // return redis.Dial("tcp", addr, redis.DialPassword(password), redis.DialDatabase(dbNum))
            return redis.Dial("tcp", addr, redis.DialPassword(password), redis.DialDatabase(DBNum))
        },
        TestOnBorrow: func(c redis.Conn, t time.Time) error {
            _, err := c.Do("PING")
            return err
        },
    }
}

在这里我使用的是第三方开源框架,有兴趣的可以查看 https://github.com/garyburd/redigo 了解情况。

2.2 使用redis存取数据

在这里我以NewSession为例,源码如下:

func (s *UserStatusHandler) NewSession(ctx context.Context, req *pb.NewSessionReq, rsp *pb.NewSessionRep) error {
    var oldSession *pb.Session
    defer func() {
        utils.SessionFree(oldSession)
    }()
    fieldMap := make(map[string]interface{}, 0)
    fieldMap["Uid"] = req.Id
    fieldMap["Address"] = req.Address
    fieldMap["Phone"] = req.Phone
    fieldMap["Name"] = req.Name
    //生成Token
    token, err := utils.NewToken(req.Id)
    if err != nil {
        log.Println("生成token失败", zap.Error(err), zap.Int32("uid", req.Id))
        return err
    }

    //删除所有旧token
    if err = utils.RemoveUserSessions(req.Id, s.pool); err != nil {
        log.Println("删除所有旧token失败", zap.Error(err), zap.Int32("uid", req.Id))
        return err
    }
    conn := s.pool.Get()
    //会话数据写入redis,格式:t:id => map的哈希值
    if _, err := conn.Do("HMSET", redis.Args{}.Add(utils.KeyOfSession(req.Id)).AddFlat(fieldMap)...); err != nil {
        conn.Close()
        log.Println("会话数据写入redis失败", zap.Error(err), zap.String("key", utils.KeyOfSession(req.Id)), zap.Any("参数", fieldMap))
        return err
    }
    //设置t:id的过期时间
    if _, err := conn.Do("EXPIRE", utils.KeyOfSession(req.Id), s.sessionExpire); err != nil {
        conn.Close()
        s.logger.Error("设置session过期时间失败", zap.Error(err), zap.String("key", utils.KeyOfSession(req.Id)))
        return err
    }

    //用户token写入set里边,格式:t:uid:set:id => token
    keyOfSet := utils.KeyOfSet(req.Id)
    if _, err = conn.Do("SADD", keyOfSet, token); err != nil {
        conn.Close()
        log.Println("token写入用户集合失败", zap.Error(err), zap.String("key", keyOfSet), zap.String("参数", token))
        return err
    }
    //设置t:uid:set:id的过期时间
    if _, err = conn.Do("EXPIRE", keyOfSet, s.sessionExpire); err != nil {
        conn.Close()
        log.Println("设置用户token集合过期时间失败", zap.Error(err), zap.String("key", keyOfSet))
        return err
    }

    //将token和id对应,格式:token => id
    if _, err = conn.Do("SETEX", utils.KeyOfToken(token), s.tokenExpire, req.Id); err != nil {
        conn.Close()
        log.Println("token写入redis失败", zap.Error(err), zap.String("key", utils.KeyOfToken(token)), zap.Int32("参数", req.Id))
        return err
    }

    rsp.Token = token
    return nil
}

如代码所示,操作redis的步骤是 conn := s.pool.Get() 先开启一个连接,再通过conn.Do(“EXPIRE”, keyOfSet, s.sessionExpire) 的一种方式操作redis中的数据,具体的可以查看redis的api,这里有个函数 utils.SessionFree(oldSession) ,这是我在utils包下自定义的一个函数,这个知识点再接下来的知识点中会有涉及。

三、额外讲解sync.Pool

我在项目中使用了sync.pool存储session对象,目的是为了保存和复用session这个临时对象,以减少内存分配,减低gc压力,那么sync.Pool是什么呢?以下是官方给出的解释(自己翻译的):

  • Pool是一个可以存取临时对象的集合。
  • Pool中保存的item都可能在没有任何通知的情况下被自动释放掉,即如果Pool持有该对象的唯一引用,这个item就可能被回收。
  • Pool在被多个线程使用的情况下是安全的。
  • Pool的目的是缓存分配了但是未使用的item用于之后的重用,以减轻GC的压力。也就是说,pool让创建高效的并且线程安全的空闲列表更加容易,不过Pool并不适用于所有空闲列表。
  • Pool的合理用法是用于管理一组被多个独立并发线程共享并可能重用的临时item。Pool提供了让多个线程分摊内存申请消耗的方法。
  • Pool比较经典的一个例子在fmt包里,该Pool维护一个动态大小的临时输出缓存仓库,该仓库会在过载(许多线程活跃的打印时)增大,在沉寂时缩小。
  • 另一方面,管理着短寿命对象的空闲列表不适合使用Pool,因为这种情况下内存申请消耗不能很好的分配。这时应该由这些对象自己实现空闲列表。

以下是Pool的数据类型:

type Pool struct {
    noCopy noCopy

    local     unsafe.Pointer // local fixed-size per-P pool, actual type is [P]poolLocal
    localSize uintptr        // size of the local array

    // New optionally specifies a function to generate
    // a value when Get would otherwise return nil.
    // It may not be changed concurrently with calls to Get.
    New func() interface{}
}

// Local per-P Pool appendix.
type poolLocalInternal struct {
    private interface{}   // Can be used only by the respective P.
    shared  []interface{} // Can be used by any P.
    Mutex                 // Protects shared.
}

type poolLocal struct {
    poolLocalInternal

    // Prevents false sharing on widespread platforms with
    // 128 mod (cache line size) = 0 .
    pad [128 - unsafe.Sizeof(poolLocalInternal{})%128]byte
}

由注释我们也可以看出,其中的local成员的真实类型是一个poolLocal数组,而localSize是数组长度,poolLocal是真正保存数据的地方。private保存了一个临时对象,shared是保存临时对象的数组,而从private和shared的注释我们也可以看出,一个是属于特定的P私有的,一个是属于所有的P,至于这个P是什么,可以自行参考golang的调度模型,后期我也会专门写一篇相关的博客。其次,Pool是给每个线程分配了一个poolLocal对象,就是说local数组的长度,就是工作线程的数量(size := runtime.GOMAXPROCS(0))。当多线程在并发读写的时候,通常情况下都是在自己线程的poolLocal中存取数据,而只有当自己线程的poolLocal中没有数据时,才会尝试加锁去其他线程的poolLocal中“偷”数据。

我们可以看看Get函数,源码如下:

func (p *Pool) Get() interface{} {
    if race.Enabled {
        race.Disable()
    }
    l := p.pin()
    x := l.private
    l.private = nil
    runtime_procUnpin()
    if x == nil {
        l.Lock()
        last := len(l.shared) - 1
        if last >= 0 {
            x = l.shared[last]
            l.shared = l.shared[:last]
        }
        l.Unlock()
        if x == nil {
            x = p.getSlow()
        }
    }
    if race.Enabled {
        race.Enable()
        if x != nil {
            race.Acquire(poolRaceAddr(x))
        }
    }
    if x == nil && p.New != nil {
        x = p.New()
    }
    return x
}

这个函数的源码并不难读,在调用Get的时候首先会先在local数组中获取当前线程对应的poolLocal对象,然后再从poolLocal对象中获取private中的数据,如果private中有数据,则取出来直接返回。如果没有则先锁住shared,然后从shared中取出数据后直接返回,如果还是没有则调用getSlow函数。那么为什么这里要锁住shared呢?答案我们可以在getSlow中找到,因为当shared中没有数据的时候,会尝试去其他的poolLocal的shared中偷数据。

    // See the comment in pin regarding ordering of the loads.
    size := atomic.LoadUintptr(&p.localSize) // load-acquire
    local := p.local                         // load-consume
    // Try to steal one element from other procs.
    pid := runtime_procPin()
    runtime_procUnpin()
    for i := 0; i < int(size); i++ {
        l := indexLocal(local, (pid+i+1)%int(size))
        l.Lock()
        last := len(l.shared) - 1
        if last >= 0 {
            x = l.shared[last]
            l.shared = l.shared[:last]
            l.Unlock()
            break
        }
        l.Unlock()
    }
    return x

tip:该项目的源码(包含数据库的增删查改的demo)可以查看 源代码

转自:https://www.jianshu.com/p/9cb474dd451d

  • 发表于 2019-08-27 15:59
  • 阅读 ( 3 )

你可能感兴趣的文章

相关问题

推荐好书