背景:

gopkg.in/mgo.v2 停止维护了

github.com/globalsign/mgo 这个是社区中目前比较活跃的库 1437 stars,最后一次提交是 2018/10/15

兼容:

兼容gopkg.in/mgo.v2 接口,一般只需要改几个包名,对比mongodb 官方的client来说相对较好的做了向上兼容

新功能增加

  • 支持 mongodb readonly views in 3.4+ 为已经存在的collection 和 view 创建只读的视图

  • 支持 collations in 3.4+ 为一个collection 定制字符串比较规则 ,常用于基于字符串的排序

  • 支持 Change Streams in 3.6+ 可以获取数据的实时变化,而不必查询oplog

  • 支持queries read concern / write concern (控制集群下的读写安全级别表现)

    ss.SetSafe(&mgo.Safe{
        W: 1,
        WMode: "majority",
        RMode: "local",
        WTimeout: int(time.Second / time.Millisecond),
        J: true,
    })
    //ss.DB("test").C("").FindId(1).Apply()
  • 实现 MongoTimestamp bson 类型

  • 支持创建 partial index

功能优化

  • time.Time 默认使用 utc 时区, 修复了之前序列化/反序列化时使用的时区模版错误
// bug
const jdateFormat = "2006-01-02T15:04:05.999Z"
// fix
const jdateFormat = "2006-01-02T15:04:05.999Z07:00"
  • 支持使用map[int]type{} 这个类型在 bson 的编码解码中。

  • 提高 bson.Raw 类型的解码速度

  • bson 反射结构体(inline tag)指针支持

type inlinePtrStruct struct {
    A int
    *MStruct `bson:",inline"`
}
  • 在没有指明bson tag的情况下,使用json tag代替,如果json tag 和 bson tag 一致则不需要进行bson tag 声明

  • count 命令支持使用最大时间

q := ss.DB("test").C("test_col").FindId(1)
q.SetMaxTime(time.Second)
q.Count()
  • 添加接口可以舍弃所有的index
ss.DB("test").C("test_col").DropAllIndexes() 

connect 优化:

  • 避免粗暴关闭可能正在使用的请求
// CloseAfterIdle terminates an idle socket, which has a zero
// reference, or marks the socket to be terminate after idle.
func (socket *mongoSocket) CloseAfterIdle() {
    socket.Lock()
    if socket.references == 0 {
        socket.Unlock()
        socket.Close()
        logf("Socket %p to %s: idle and close.", socket, socket.addr)
        return
    }
    socket.closeAfterIdle = true
    socket.Unlock()
    logf("Socket %p to %s: close after idle.", socket, socket.addr)
}

mongoServer 如果不可用,则所有的socket 会被强制kill 造成正在所有在用的session 试图重新执行query,造成雪崩效应。

  • 降低了socket 写buffer中锁的粒度
    不在socket 的net write 方法上加锁,在网络堵塞的情况下不hold锁 merger request

  • 优化了并发条件下的socket回收:
    在并发的情况下,pool size 会加大,并且不会主动关闭socket,通过server.poolShrinker 函数,每隔一分钟获查看当前server下未使用的socket,如果超过 MinSocketPool并且单个socket闲置时间超过了 MaxIdleTimeMS 的大小,则回收socket

func (server *mongoServer) poolShrinker() {
    ticker := time.NewTicker(1 * time.Minute)
    for _ = range ticker.C {
        if server.closed {
            ticker.Stop()
            return
        }
        server.Lock()
        unused := len(server.unusedSockets)
        if unused  0 {
            next := make([]*mongoSocket, unused-end)
            copy(next, server.unusedSockets[end:])
            server.unusedSockets = next
            remainSockets := []*mongoSocket{}
            for _, s := range server.liveSockets {
                if _, ok := reclaimMap[s]; !ok {
                    remainSockets = append(remainSockets, s)
                }
            }
            server.liveSockets = remainSockets
            stats.conn(-1*end, server.info.Master)
        }
        server.Unlock()

        for _, s := range tbr {
            s.Close()
        }
    }
}
  • 优化在连接数达到上限的情况下, 提高获取新socket的响应速度
    使用sync.Cond sync.Boardcast 的方式,减少轮询的延时(本来是100ms轮询的),并且添加 PoolTimeout 配置项,管理获取新连接的时间,默认的时间是无穷(对于多并发情况下并不合适)。

  • 缓存复用
    在mongoSocket.Query 中使用sync.Pool 复用缓存序列化和反序列化。

func (socket *mongoSocket) Query(ops ...interface{}) (err error) {

    ......

    buf := bytesBufferPool.Get().([]byte)
    defer func() {
        bytesBufferPool.Put(buf[:0])
    }()

    ......

连接的实现逻辑:

mongo 调用逻辑.png

代码建议

使用session的方式

  • 所有的方案需要设置最大的连接池,保证并发情况下不要产生过多的连接.

  • copy方案:使用session 之前每次copy root session,生成一个新的session (记得完成请求后close copy 出来的session,不然socket 的引用计数不会归0,造成泄漏).

  • 是使用session 之前每次refresh一下,使用同一个session,每次不需要close session (也可以定时refresh session).

  • 使用同一个session,只有在请求发生错误的时候调用Refresh() 函数,正常请求下性能更高,即retry 模式,缺点是在集中错误恢的复情况下容易造成雪崩。可以配合限流、熔断、资源隔离等模式提高可用性。


代码演示 (更多workshop使用方法可以参考)

    ss, err := mgo.Dial("")
    if err != nil {
        panic(err)
    }

    defer ss.Close()
    pool := workshop.NewPool(3)

    var retried bool

    pms := workshop.NewPromise(pool, workshop.Process{
        Process: func(ctx context.Context, last interface{}) (interface{}, error) {
            id := "test-id-" + fmt.Sprint(1000)
            var result = map[string]interface{}{}
            err = ss.DB("test").C("concurrence").FindId(id).One(&result)
            return result, err
        },
    }).RecoverAndRetry(workshop.ExceptionProcess{
        Process: func(ctx context.Context, err error, last interface{}) (interface{}, error) {
            if retried {
                return nil, err
            }
            if err == mgo.ErrNotFound {
                return last, err
            }
            switch err.(type){
            case *mgo.QueryError:
                return last, err
            default:
                ss.Refresh()
                retried = true
                return last, nil
            }
        },
    })

    ctx, _ := context.WithTimeout(context.Background(), time.Second)
    data, err := pms.Get(ctx)
    if err != nil {
        // handle err
    }

    log.Println("get map data:", data.(map[string]interface{}))

文章来源于互联网:globalsign mgo 介绍

发表评论