1
0
Fork 0
mirror of https://github.com/docker/awesome-compose.git synced 2025-04-26 10:43:35 +02:00

add go mysql redis sample

Signed-off-by: vishal979 <shantapowertech2@gmail.com>
This commit is contained in:
vishal979 2020-07-04 19:38:53 +05:30
parent 638c159abd
commit 207f73ab81
384 changed files with 233601 additions and 0 deletions

2
go-mysql-redis/vendor/gopkg.in/redis.v5/.gitignore generated vendored Normal file
View file

@ -0,0 +1,2 @@
*.rdb
testdata/*/

22
go-mysql-redis/vendor/gopkg.in/redis.v5/.travis.yml generated vendored Normal file
View file

@ -0,0 +1,22 @@
sudo: false
language: go
services:
- redis-server
go:
- 1.4
- 1.6
- 1.7
- tip
matrix:
allow_failures:
- go: 1.4
- go: tip
install:
- go get github.com/onsi/ginkgo
- go get github.com/onsi/gomega
- mkdir -p $HOME/gopath/src/gopkg.in
- mv `pwd` $HOME/gopath/src/gopkg.in/redis.v5

25
go-mysql-redis/vendor/gopkg.in/redis.v5/LICENSE generated vendored Normal file
View file

@ -0,0 +1,25 @@
Copyright (c) 2013 The github.com/go-redis/redis Authors.
All rights reserved.
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are
met:
* Redistributions of source code must retain the above copyright
notice, this list of conditions and the following disclaimer.
* Redistributions in binary form must reproduce the above
copyright notice, this list of conditions and the following disclaimer
in the documentation and/or other materials provided with the
distribution.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

19
go-mysql-redis/vendor/gopkg.in/redis.v5/Makefile generated vendored Normal file
View file

@ -0,0 +1,19 @@
all: testdeps
go test ./...
go test ./... -short -race
go vet
testdeps: testdata/redis/src/redis-server
bench: testdeps
go test ./... -test.run=NONE -test.bench=. -test.benchmem
.PHONY: all test testdeps bench
testdata/redis:
mkdir -p $@
wget -qO- https://github.com/antirez/redis/archive/unstable.tar.gz | tar xvz --strip-components=1 -C $@
testdata/redis/src/redis-server: testdata/redis
sed -i 's/libjemalloc.a/libjemalloc.a -lrt/g' $</src/Makefile
cd $< && make all

136
go-mysql-redis/vendor/gopkg.in/redis.v5/README.md generated vendored Normal file
View file

@ -0,0 +1,136 @@
# Redis client for Golang [![Build Status](https://travis-ci.org/go-redis/redis.png?branch=v5)](https://travis-ci.org/go-redis/redis)
Supports:
- Redis 3 commands except QUIT, MONITOR, SLOWLOG and SYNC.
- [Pub/Sub](https://godoc.org/gopkg.in/redis.v5#PubSub).
- [Transactions](https://godoc.org/gopkg.in/redis.v5#Multi).
- [Pipeline](https://godoc.org/gopkg.in/redis.v5#example-Client-Pipeline) and [TxPipeline](https://godoc.org/gopkg.in/redis.v5#example-Client-TxPipeline).
- [Scripting](https://godoc.org/gopkg.in/redis.v5#Script).
- [Timeouts](https://godoc.org/gopkg.in/redis.v5#Options).
- [Redis Sentinel](https://godoc.org/gopkg.in/redis.v5#NewFailoverClient).
- [Redis Cluster](https://godoc.org/gopkg.in/redis.v5#NewClusterClient).
- [Ring](https://godoc.org/gopkg.in/redis.v5#NewRing).
- [Instrumentation](https://godoc.org/gopkg.in/redis.v5#ex-package--Instrumentation).
- [Cache friendly](https://github.com/go-redis/cache).
- [Rate limiting](https://github.com/go-redis/rate).
- [Distributed Locks](https://github.com/bsm/redis-lock).
API docs: https://godoc.org/gopkg.in/redis.v5.
Examples: https://godoc.org/gopkg.in/redis.v5#pkg-examples.
## Installation
Install:
```shell
go get gopkg.in/redis.v5
```
Import:
```go
import "gopkg.in/redis.v5"
```
## Quickstart
```go
func ExampleNewClient() {
client := redis.NewClient(&redis.Options{
Addr: "localhost:6379",
Password: "", // no password set
DB: 0, // use default DB
})
pong, err := client.Ping().Result()
fmt.Println(pong, err)
// Output: PONG <nil>
}
func ExampleClient() {
err := client.Set("key", "value", 0).Err()
if err != nil {
panic(err)
}
val, err := client.Get("key").Result()
if err != nil {
panic(err)
}
fmt.Println("key", val)
val2, err := client.Get("key2").Result()
if err == redis.Nil {
fmt.Println("key2 does not exists")
} else if err != nil {
panic(err)
} else {
fmt.Println("key2", val2)
}
// Output: key value
// key2 does not exists
}
```
## Howto
Please go through [examples](https://godoc.org/gopkg.in/redis.v5#pkg-examples) to get an idea how to use this package.
## Look and feel
Some corner cases:
SET key value EX 10 NX
set, err := client.SetNX("key", "value", 10*time.Second).Result()
SORT list LIMIT 0 2 ASC
vals, err := client.Sort("list", redis.Sort{Offset: 0, Count: 2, Order: "ASC"}).Result()
ZRANGEBYSCORE zset -inf +inf WITHSCORES LIMIT 0 2
vals, err := client.ZRangeByScoreWithScores("zset", redis.ZRangeBy{
Min: "-inf",
Max: "+inf",
Offset: 0,
Count: 2,
}).Result()
ZINTERSTORE out 2 zset1 zset2 WEIGHTS 2 3 AGGREGATE SUM
vals, err := client.ZInterStore("out", redis.ZStore{Weights: []int64{2, 3}}, "zset1", "zset2").Result()
EVAL "return {KEYS[1],ARGV[1]}" 1 "key" "hello"
vals, err := client.Eval("return {KEYS[1],ARGV[1]}", []string{"key"}, []string{"hello"}).Result()
## Benchmark
go-redis vs redigo:
```
BenchmarkSetGoRedis10Conns64Bytes-4 200000 7621 ns/op 210 B/op 6 allocs/op
BenchmarkSetGoRedis100Conns64Bytes-4 200000 7554 ns/op 210 B/op 6 allocs/op
BenchmarkSetGoRedis10Conns1KB-4 200000 7697 ns/op 210 B/op 6 allocs/op
BenchmarkSetGoRedis100Conns1KB-4 200000 7688 ns/op 210 B/op 6 allocs/op
BenchmarkSetGoRedis10Conns10KB-4 200000 9214 ns/op 210 B/op 6 allocs/op
BenchmarkSetGoRedis100Conns10KB-4 200000 9181 ns/op 210 B/op 6 allocs/op
BenchmarkSetGoRedis10Conns1MB-4 2000 583242 ns/op 2337 B/op 6 allocs/op
BenchmarkSetGoRedis100Conns1MB-4 2000 583089 ns/op 2338 B/op 6 allocs/op
BenchmarkSetRedigo10Conns64Bytes-4 200000 7576 ns/op 208 B/op 7 allocs/op
BenchmarkSetRedigo100Conns64Bytes-4 200000 7782 ns/op 208 B/op 7 allocs/op
BenchmarkSetRedigo10Conns1KB-4 200000 7958 ns/op 208 B/op 7 allocs/op
BenchmarkSetRedigo100Conns1KB-4 200000 7725 ns/op 208 B/op 7 allocs/op
BenchmarkSetRedigo10Conns10KB-4 100000 18442 ns/op 208 B/op 7 allocs/op
BenchmarkSetRedigo100Conns10KB-4 100000 18818 ns/op 208 B/op 7 allocs/op
BenchmarkSetRedigo10Conns1MB-4 2000 668829 ns/op 226 B/op 7 allocs/op
BenchmarkSetRedigo100Conns1MB-4 2000 679542 ns/op 226 B/op 7 allocs/op
```
Redis Cluster:
```
BenchmarkRedisPing-4 200000 6983 ns/op 116 B/op 4 allocs/op
BenchmarkRedisClusterPing-4 100000 11535 ns/op 117 B/op 4 allocs/op
```
## Shameless plug
Check my [PostgreSQL client for Go](https://github.com/go-pg/pg).

940
go-mysql-redis/vendor/gopkg.in/redis.v5/cluster.go generated vendored Normal file
View file

@ -0,0 +1,940 @@
package redis
import (
"fmt"
"math/rand"
"sync"
"sync/atomic"
"time"
"gopkg.in/redis.v5/internal"
"gopkg.in/redis.v5/internal/hashtag"
"gopkg.in/redis.v5/internal/pool"
"gopkg.in/redis.v5/internal/proto"
)
var errClusterNoNodes = internal.RedisError("redis: cluster has no nodes")
var errNilClusterState = internal.RedisError("redis: cannot load cluster slots")
// ClusterOptions are used to configure a cluster client and should be
// passed to NewClusterClient.
type ClusterOptions struct {
// A seed list of host:port addresses of cluster nodes.
Addrs []string
// The maximum number of retries before giving up. Command is retried
// on network errors and MOVED/ASK redirects.
// Default is 16.
MaxRedirects int
// Enables read queries for a connection to a Redis Cluster slave node.
ReadOnly bool
// Enables routing read-only queries to the closest master or slave node.
RouteByLatency bool
// Following options are copied from Options struct.
Password string
DialTimeout time.Duration
ReadTimeout time.Duration
WriteTimeout time.Duration
// PoolSize applies per cluster node and not for the whole cluster.
PoolSize int
PoolTimeout time.Duration
IdleTimeout time.Duration
IdleCheckFrequency time.Duration
}
func (opt *ClusterOptions) init() {
if opt.MaxRedirects == -1 {
opt.MaxRedirects = 0
} else if opt.MaxRedirects == 0 {
opt.MaxRedirects = 16
}
if opt.RouteByLatency {
opt.ReadOnly = true
}
}
func (opt *ClusterOptions) clientOptions() *Options {
const disableIdleCheck = -1
return &Options{
Password: opt.Password,
ReadOnly: opt.ReadOnly,
DialTimeout: opt.DialTimeout,
ReadTimeout: opt.ReadTimeout,
WriteTimeout: opt.WriteTimeout,
PoolSize: opt.PoolSize,
PoolTimeout: opt.PoolTimeout,
IdleTimeout: opt.IdleTimeout,
// IdleCheckFrequency is not copied to disable reaper
IdleCheckFrequency: disableIdleCheck,
}
}
//------------------------------------------------------------------------------
type clusterNode struct {
Client *Client
Latency time.Duration
loading time.Time
}
func newClusterNode(clOpt *ClusterOptions, addr string) *clusterNode {
opt := clOpt.clientOptions()
opt.Addr = addr
node := clusterNode{
Client: NewClient(opt),
}
if clOpt.RouteByLatency {
node.updateLatency()
}
return &node
}
func (n *clusterNode) updateLatency() {
const probes = 10
for i := 0; i < probes; i++ {
start := time.Now()
n.Client.Ping()
n.Latency += time.Since(start)
}
n.Latency = n.Latency / probes
}
func (n *clusterNode) Loading() bool {
return !n.loading.IsZero() && time.Since(n.loading) < time.Minute
}
//------------------------------------------------------------------------------
type clusterNodes struct {
opt *ClusterOptions
mu sync.RWMutex
addrs []string
nodes map[string]*clusterNode
closed bool
}
func newClusterNodes(opt *ClusterOptions) *clusterNodes {
return &clusterNodes{
opt: opt,
nodes: make(map[string]*clusterNode),
}
}
func (c *clusterNodes) Close() error {
c.mu.Lock()
defer c.mu.Unlock()
if c.closed {
return nil
}
c.closed = true
var firstErr error
for _, node := range c.nodes {
if err := node.Client.Close(); err != nil && firstErr == nil {
firstErr = err
}
}
c.addrs = nil
c.nodes = nil
return firstErr
}
func (c *clusterNodes) All() ([]*clusterNode, error) {
c.mu.RLock()
defer c.mu.RUnlock()
if c.closed {
return nil, pool.ErrClosed
}
nodes := make([]*clusterNode, 0, len(c.nodes))
for _, node := range c.nodes {
nodes = append(nodes, node)
}
return nodes, nil
}
func (c *clusterNodes) Get(addr string) (*clusterNode, error) {
var node *clusterNode
var ok bool
c.mu.RLock()
if !c.closed {
node, ok = c.nodes[addr]
}
c.mu.RUnlock()
if ok {
return node, nil
}
c.mu.Lock()
defer c.mu.Unlock()
if c.closed {
return nil, pool.ErrClosed
}
node, ok = c.nodes[addr]
if ok {
return node, nil
}
c.addrs = append(c.addrs, addr)
node = newClusterNode(c.opt, addr)
c.nodes[addr] = node
return node, nil
}
func (c *clusterNodes) Random() (*clusterNode, error) {
c.mu.RLock()
closed := c.closed
addrs := c.addrs
c.mu.RUnlock()
if closed {
return nil, pool.ErrClosed
}
if len(addrs) == 0 {
return nil, errClusterNoNodes
}
var nodeErr error
for i := 0; i <= c.opt.MaxRedirects; i++ {
n := rand.Intn(len(addrs))
node, err := c.Get(addrs[n])
if err != nil {
return nil, err
}
nodeErr = node.Client.ClusterInfo().Err()
if nodeErr == nil {
return node, nil
}
}
return nil, nodeErr
}
//------------------------------------------------------------------------------
type clusterState struct {
nodes *clusterNodes
slots [][]*clusterNode
}
func newClusterState(nodes *clusterNodes, slots []ClusterSlot) (*clusterState, error) {
c := clusterState{
nodes: nodes,
slots: make([][]*clusterNode, hashtag.SlotNumber),
}
for _, slot := range slots {
var nodes []*clusterNode
for _, slotNode := range slot.Nodes {
node, err := c.nodes.Get(slotNode.Addr)
if err != nil {
return nil, err
}
nodes = append(nodes, node)
}
for i := slot.Start; i <= slot.End; i++ {
c.slots[i] = nodes
}
}
return &c, nil
}
func (c *clusterState) slotMasterNode(slot int) (*clusterNode, error) {
nodes := c.slotNodes(slot)
if len(nodes) > 0 {
return nodes[0], nil
}
return c.nodes.Random()
}
func (c *clusterState) slotSlaveNode(slot int) (*clusterNode, error) {
nodes := c.slotNodes(slot)
switch len(nodes) {
case 0:
return c.nodes.Random()
case 1:
return nodes[0], nil
case 2:
if slave := nodes[1]; !slave.Loading() {
return slave, nil
}
return nodes[0], nil
default:
var slave *clusterNode
for i := 0; i < 10; i++ {
n := rand.Intn(len(nodes)-1) + 1
slave = nodes[n]
if !slave.Loading() {
break
}
}
return slave, nil
}
}
func (c *clusterState) slotClosestNode(slot int) (*clusterNode, error) {
const threshold = time.Millisecond
nodes := c.slotNodes(slot)
if len(nodes) == 0 {
return c.nodes.Random()
}
var node *clusterNode
for _, n := range nodes {
if n.Loading() {
continue
}
if node == nil || node.Latency-n.Latency > threshold {
node = n
}
}
return node, nil
}
func (c *clusterState) slotNodes(slot int) []*clusterNode {
if slot < len(c.slots) {
return c.slots[slot]
}
return nil
}
//------------------------------------------------------------------------------
// ClusterClient is a Redis Cluster client representing a pool of zero
// or more underlying connections. It's safe for concurrent use by
// multiple goroutines.
type ClusterClient struct {
cmdable
opt *ClusterOptions
cmds map[string]*CommandInfo
nodes *clusterNodes
_state atomic.Value
// Reports where slots reloading is in progress.
reloading uint32
closed bool
}
// NewClusterClient returns a Redis Cluster client as described in
// http://redis.io/topics/cluster-spec.
func NewClusterClient(opt *ClusterOptions) *ClusterClient {
opt.init()
c := &ClusterClient{
opt: opt,
nodes: newClusterNodes(opt),
}
c.cmdable.process = c.Process
// Add initial nodes.
for _, addr := range opt.Addrs {
_, _ = c.nodes.Get(addr)
}
// Preload cluster slots.
for i := 0; i < 10; i++ {
state, err := c.reloadSlots()
if err == nil {
c._state.Store(state)
break
}
}
if opt.IdleCheckFrequency > 0 {
go c.reaper(opt.IdleCheckFrequency)
}
return c
}
func (c *ClusterClient) state() *clusterState {
v := c._state.Load()
if v != nil {
return v.(*clusterState)
}
c.lazyReloadSlots()
return nil
}
func (c *ClusterClient) cmdSlotAndNode(state *clusterState, cmd Cmder) (int, *clusterNode, error) {
if state == nil {
node, err := c.nodes.Random()
return 0, node, err
}
cmdInfo := c.cmds[cmd.name()]
firstKey := cmd.arg(cmdFirstKeyPos(cmd, cmdInfo))
slot := hashtag.Slot(firstKey)
if cmdInfo != nil && cmdInfo.ReadOnly && c.opt.ReadOnly {
if c.opt.RouteByLatency {
node, err := state.slotClosestNode(slot)
return slot, node, err
}
node, err := state.slotSlaveNode(slot)
return slot, node, err
}
node, err := state.slotMasterNode(slot)
return slot, node, err
}
func (c *ClusterClient) Watch(fn func(*Tx) error, keys ...string) error {
state := c.state()
var node *clusterNode
var err error
if state != nil && len(keys) > 0 {
node, err = state.slotMasterNode(hashtag.Slot(keys[0]))
} else {
node, err = c.nodes.Random()
}
if err != nil {
return err
}
return node.Client.Watch(fn, keys...)
}
// Close closes the cluster client, releasing any open resources.
//
// It is rare to Close a ClusterClient, as the ClusterClient is meant
// to be long-lived and shared between many goroutines.
func (c *ClusterClient) Close() error {
return c.nodes.Close()
}
func (c *ClusterClient) Process(cmd Cmder) error {
slot, node, err := c.cmdSlotAndNode(c.state(), cmd)
if err != nil {
cmd.setErr(err)
return err
}
var ask bool
for attempt := 0; attempt <= c.opt.MaxRedirects; attempt++ {
if ask {
pipe := node.Client.Pipeline()
pipe.Process(NewCmd("ASKING"))
pipe.Process(cmd)
_, err = pipe.Exec()
pipe.Close()
ask = false
} else {
err = node.Client.Process(cmd)
}
// If there is no (real) error - we are done.
if err == nil {
return nil
}
// If slave is loading - read from master.
if c.opt.ReadOnly && internal.IsLoadingError(err) {
node.loading = time.Now()
continue
}
// On network errors try random node.
if internal.IsRetryableError(err) {
node, err = c.nodes.Random()
if err != nil {
cmd.setErr(err)
return err
}
continue
}
var moved bool
var addr string
moved, ask, addr = internal.IsMovedError(err)
if moved || ask {
state := c.state()
if state != nil && slot >= 0 {
master, _ := state.slotMasterNode(slot)
if moved && (master == nil || master.Client.getAddr() != addr) {
c.lazyReloadSlots()
}
}
node, err = c.nodes.Get(addr)
if err != nil {
cmd.setErr(err)
return err
}
continue
}
break
}
return cmd.Err()
}
// ForEachNode concurrently calls the fn on each ever known node in the cluster.
// It returns the first error if any.
func (c *ClusterClient) ForEachNode(fn func(client *Client) error) error {
nodes, err := c.nodes.All()
if err != nil {
return err
}
var wg sync.WaitGroup
errCh := make(chan error, 1)
for _, node := range nodes {
wg.Add(1)
go func(node *clusterNode) {
defer wg.Done()
err := fn(node.Client)
if err != nil {
select {
case errCh <- err:
default:
}
}
}(node)
}
wg.Wait()
select {
case err := <-errCh:
return err
default:
return nil
}
}
// ForEachMaster concurrently calls the fn on each master node in the cluster.
// It returns the first error if any.
func (c *ClusterClient) ForEachMaster(fn func(client *Client) error) error {
state := c.state()
if state == nil {
return errNilClusterState
}
var wg sync.WaitGroup
visited := make(map[*clusterNode]struct{})
errCh := make(chan error, 1)
for _, nodes := range state.slots {
if len(nodes) == 0 {
continue
}
master := nodes[0]
if _, ok := visited[master]; ok {
continue
}
visited[master] = struct{}{}
wg.Add(1)
go func(node *clusterNode) {
defer wg.Done()
err := fn(node.Client)
if err != nil {
select {
case errCh <- err:
default:
}
}
}(master)
}
wg.Wait()
select {
case err := <-errCh:
return err
default:
return nil
}
}
// PoolStats returns accumulated connection pool stats.
func (c *ClusterClient) PoolStats() *PoolStats {
var acc PoolStats
nodes, err := c.nodes.All()
if err != nil {
return &acc
}
for _, node := range nodes {
s := node.Client.connPool.Stats()
acc.Requests += s.Requests
acc.Hits += s.Hits
acc.Timeouts += s.Timeouts
acc.TotalConns += s.TotalConns
acc.FreeConns += s.FreeConns
}
return &acc
}
func (c *ClusterClient) lazyReloadSlots() {
if !atomic.CompareAndSwapUint32(&c.reloading, 0, 1) {
return
}
go func() {
for i := 0; i < 1000; i++ {
state, err := c.reloadSlots()
if err == pool.ErrClosed {
break
}
if err == nil {
c._state.Store(state)
break
}
time.Sleep(time.Millisecond)
}
time.Sleep(3 * time.Second)
atomic.StoreUint32(&c.reloading, 0)
}()
}
func (c *ClusterClient) reloadSlots() (*clusterState, error) {
node, err := c.nodes.Random()
if err != nil {
return nil, err
}
// TODO: fix race
if c.cmds == nil {
cmds, err := node.Client.Command().Result()
if err != nil {
return nil, err
}
c.cmds = cmds
}
slots, err := node.Client.ClusterSlots().Result()
if err != nil {
return nil, err
}
return newClusterState(c.nodes, slots)
}
// reaper closes idle connections to the cluster.
func (c *ClusterClient) reaper(idleCheckFrequency time.Duration) {
ticker := time.NewTicker(idleCheckFrequency)
defer ticker.Stop()
for _ = range ticker.C {
nodes, err := c.nodes.All()
if err != nil {
break
}
var n int
for _, node := range nodes {
nn, err := node.Client.connPool.(*pool.ConnPool).ReapStaleConns()
if err != nil {
internal.Logf("ReapStaleConns failed: %s", err)
} else {
n += nn
}
}
s := c.PoolStats()
internal.Logf(
"reaper: removed %d stale conns (TotalConns=%d FreeConns=%d Requests=%d Hits=%d Timeouts=%d)",
n, s.TotalConns, s.FreeConns, s.Requests, s.Hits, s.Timeouts,
)
}
}
func (c *ClusterClient) Pipeline() *Pipeline {
pipe := Pipeline{
exec: c.pipelineExec,
}
pipe.cmdable.process = pipe.Process
pipe.statefulCmdable.process = pipe.Process
return &pipe
}
func (c *ClusterClient) Pipelined(fn func(*Pipeline) error) ([]Cmder, error) {
return c.Pipeline().pipelined(fn)
}
func (c *ClusterClient) pipelineExec(cmds []Cmder) error {
cmdsMap, err := c.mapCmdsByNode(cmds)
if err != nil {
return err
}
for i := 0; i <= c.opt.MaxRedirects; i++ {
failedCmds := make(map[*clusterNode][]Cmder)
for node, cmds := range cmdsMap {
cn, _, err := node.Client.conn()
if err != nil {
setCmdsErr(cmds, err)
continue
}
err = c.pipelineProcessCmds(cn, cmds, failedCmds)
node.Client.putConn(cn, err, false)
}
if len(failedCmds) == 0 {
break
}
cmdsMap = failedCmds
}
var firstErr error
for _, cmd := range cmds {
if err := cmd.Err(); err != nil {
firstErr = err
break
}
}
return firstErr
}
func (c *ClusterClient) mapCmdsByNode(cmds []Cmder) (map[*clusterNode][]Cmder, error) {
state := c.state()
cmdsMap := make(map[*clusterNode][]Cmder)
for _, cmd := range cmds {
_, node, err := c.cmdSlotAndNode(state, cmd)
if err != nil {
return nil, err
}
cmdsMap[node] = append(cmdsMap[node], cmd)
}
return cmdsMap, nil
}
func (c *ClusterClient) pipelineProcessCmds(
cn *pool.Conn, cmds []Cmder, failedCmds map[*clusterNode][]Cmder,
) error {
cn.SetWriteTimeout(c.opt.WriteTimeout)
if err := writeCmd(cn, cmds...); err != nil {
setCmdsErr(cmds, err)
return err
}
// Set read timeout for all commands.
cn.SetReadTimeout(c.opt.ReadTimeout)
return c.pipelineReadCmds(cn, cmds, failedCmds)
}
func (c *ClusterClient) pipelineReadCmds(
cn *pool.Conn, cmds []Cmder, failedCmds map[*clusterNode][]Cmder,
) error {
var firstErr error
for _, cmd := range cmds {
err := cmd.readReply(cn)
if err == nil {
continue
}
if firstErr == nil {
firstErr = err
}
err = c.checkMovedErr(cmd, failedCmds)
if err != nil && firstErr == nil {
firstErr = err
}
}
return firstErr
}
func (c *ClusterClient) checkMovedErr(cmd Cmder, failedCmds map[*clusterNode][]Cmder) error {
moved, ask, addr := internal.IsMovedError(cmd.Err())
if moved {
c.lazyReloadSlots()
node, err := c.nodes.Get(addr)
if err != nil {
return err
}
failedCmds[node] = append(failedCmds[node], cmd)
}
if ask {
node, err := c.nodes.Get(addr)
if err != nil {
return err
}
failedCmds[node] = append(failedCmds[node], NewCmd("ASKING"), cmd)
}
return nil
}
// TxPipeline acts like Pipeline, but wraps queued commands with MULTI/EXEC.
func (c *ClusterClient) TxPipeline() *Pipeline {
pipe := Pipeline{
exec: c.txPipelineExec,
}
pipe.cmdable.process = pipe.Process
pipe.statefulCmdable.process = pipe.Process
return &pipe
}
func (c *ClusterClient) TxPipelined(fn func(*Pipeline) error) ([]Cmder, error) {
return c.Pipeline().pipelined(fn)
}
func (c *ClusterClient) txPipelineExec(cmds []Cmder) error {
cmdsMap, err := c.mapCmdsBySlot(cmds)
if err != nil {
return err
}
state := c.state()
if state == nil {
return errNilClusterState
}
for slot, cmds := range cmdsMap {
node, err := state.slotMasterNode(slot)
if err != nil {
setCmdsErr(cmds, err)
continue
}
cmdsMap := map[*clusterNode][]Cmder{node: cmds}
for i := 0; i <= c.opt.MaxRedirects; i++ {
failedCmds := make(map[*clusterNode][]Cmder)
for node, cmds := range cmdsMap {
cn, _, err := node.Client.conn()
if err != nil {
setCmdsErr(cmds, err)
continue
}
err = c.txPipelineProcessCmds(node, cn, cmds, failedCmds)
node.Client.putConn(cn, err, false)
}
if len(failedCmds) == 0 {
break
}
cmdsMap = failedCmds
}
}
var firstErr error
for _, cmd := range cmds {
if err := cmd.Err(); err != nil {
firstErr = err
break
}
}
return firstErr
}
func (c *ClusterClient) mapCmdsBySlot(cmds []Cmder) (map[int][]Cmder, error) {
state := c.state()
cmdsMap := make(map[int][]Cmder)
for _, cmd := range cmds {
slot, _, err := c.cmdSlotAndNode(state, cmd)
if err != nil {
return nil, err
}
cmdsMap[slot] = append(cmdsMap[slot], cmd)
}
return cmdsMap, nil
}
func (c *ClusterClient) txPipelineProcessCmds(
node *clusterNode, cn *pool.Conn, cmds []Cmder, failedCmds map[*clusterNode][]Cmder,
) error {
cn.SetWriteTimeout(c.opt.WriteTimeout)
if err := txPipelineWriteMulti(cn, cmds); err != nil {
setCmdsErr(cmds, err)
failedCmds[node] = cmds
return err
}
// Set read timeout for all commands.
cn.SetReadTimeout(c.opt.ReadTimeout)
if err := c.txPipelineReadQueued(cn, cmds, failedCmds); err != nil {
return err
}
_, err := pipelineReadCmds(cn, cmds)
return err
}
func (c *ClusterClient) txPipelineReadQueued(
cn *pool.Conn, cmds []Cmder, failedCmds map[*clusterNode][]Cmder,
) error {
var firstErr error
// Parse queued replies.
var statusCmd StatusCmd
if err := statusCmd.readReply(cn); err != nil && firstErr == nil {
firstErr = err
}
for _, cmd := range cmds {
err := statusCmd.readReply(cn)
if err == nil {
continue
}
cmd.setErr(err)
if firstErr == nil {
firstErr = err
}
err = c.checkMovedErr(cmd, failedCmds)
if err != nil && firstErr == nil {
firstErr = err
}
}
// Parse number of replies.
line, err := cn.Rd.ReadLine()
if err != nil {
if err == Nil {
err = TxFailedErr
}
return err
}
switch line[0] {
case proto.ErrorReply:
return proto.ParseErrorReply(line)
case proto.ArrayReply:
// ok
default:
err := fmt.Errorf("redis: expected '*', but got line %q", line)
return err
}
return firstErr
}

956
go-mysql-redis/vendor/gopkg.in/redis.v5/command.go generated vendored Normal file
View file

@ -0,0 +1,956 @@
package redis
import (
"bytes"
"fmt"
"strconv"
"strings"
"time"
"gopkg.in/redis.v5/internal"
"gopkg.in/redis.v5/internal/pool"
"gopkg.in/redis.v5/internal/proto"
)
var (
_ Cmder = (*Cmd)(nil)
_ Cmder = (*SliceCmd)(nil)
_ Cmder = (*StatusCmd)(nil)
_ Cmder = (*IntCmd)(nil)
_ Cmder = (*DurationCmd)(nil)
_ Cmder = (*BoolCmd)(nil)
_ Cmder = (*StringCmd)(nil)
_ Cmder = (*FloatCmd)(nil)
_ Cmder = (*StringSliceCmd)(nil)
_ Cmder = (*BoolSliceCmd)(nil)
_ Cmder = (*StringStringMapCmd)(nil)
_ Cmder = (*StringIntMapCmd)(nil)
_ Cmder = (*ZSliceCmd)(nil)
_ Cmder = (*ScanCmd)(nil)
_ Cmder = (*ClusterSlotsCmd)(nil)
)
type Cmder interface {
args() []interface{}
arg(int) string
name() string
readReply(*pool.Conn) error
setErr(error)
readTimeout() *time.Duration
Err() error
fmt.Stringer
}
func setCmdsErr(cmds []Cmder, e error) {
for _, cmd := range cmds {
cmd.setErr(e)
}
}
func writeCmd(cn *pool.Conn, cmds ...Cmder) error {
cn.Wb.Reset()
for _, cmd := range cmds {
if err := cn.Wb.Append(cmd.args()); err != nil {
return err
}
}
_, err := cn.Write(cn.Wb.Bytes())
return err
}
func cmdString(cmd Cmder, val interface{}) string {
var ss []string
for _, arg := range cmd.args() {
ss = append(ss, fmt.Sprint(arg))
}
s := strings.Join(ss, " ")
if err := cmd.Err(); err != nil {
return s + ": " + err.Error()
}
if val != nil {
switch vv := val.(type) {
case []byte:
return s + ": " + string(vv)
default:
return s + ": " + fmt.Sprint(val)
}
}
return s
}
func cmdFirstKeyPos(cmd Cmder, info *CommandInfo) int {
switch cmd.name() {
case "eval", "evalsha":
if cmd.arg(2) != "0" {
return 3
} else {
return -1
}
}
if info == nil {
internal.Logf("info for cmd=%s not found", cmd.name())
return -1
}
return int(info.FirstKeyPos)
}
//------------------------------------------------------------------------------
type baseCmd struct {
_args []interface{}
err error
_readTimeout *time.Duration
}
func (cmd *baseCmd) Err() error {
if cmd.err != nil {
return cmd.err
}
return nil
}
func (cmd *baseCmd) args() []interface{} {
return cmd._args
}
func (cmd *baseCmd) arg(pos int) string {
if pos < 0 || pos >= len(cmd._args) {
return ""
}
s, _ := cmd._args[pos].(string)
return s
}
func (cmd *baseCmd) name() string {
if len(cmd._args) > 0 {
// Cmd name must be lower cased.
s := internal.ToLower(cmd.arg(0))
cmd._args[0] = s
return s
}
return ""
}
func (cmd *baseCmd) readTimeout() *time.Duration {
return cmd._readTimeout
}
func (cmd *baseCmd) setReadTimeout(d time.Duration) {
cmd._readTimeout = &d
}
func (cmd *baseCmd) setErr(e error) {
cmd.err = e
}
func newBaseCmd(args []interface{}) baseCmd {
if len(args) > 0 {
// Cmd name is expected to be in lower case.
args[0] = internal.ToLower(args[0].(string))
}
return baseCmd{_args: args}
}
//------------------------------------------------------------------------------
type Cmd struct {
baseCmd
val interface{}
}
func NewCmd(args ...interface{}) *Cmd {
return &Cmd{
baseCmd: baseCmd{_args: args},
}
}
func (cmd *Cmd) Val() interface{} {
return cmd.val
}
func (cmd *Cmd) Result() (interface{}, error) {
return cmd.val, cmd.err
}
func (cmd *Cmd) String() string {
return cmdString(cmd, cmd.val)
}
func (cmd *Cmd) readReply(cn *pool.Conn) error {
cmd.val, cmd.err = cn.Rd.ReadReply(sliceParser)
if cmd.err != nil {
return cmd.err
}
if b, ok := cmd.val.([]byte); ok {
// Bytes must be copied, because underlying memory is reused.
cmd.val = string(b)
}
return nil
}
//------------------------------------------------------------------------------
type SliceCmd struct {
baseCmd
val []interface{}
}
func NewSliceCmd(args ...interface{}) *SliceCmd {
return &SliceCmd{
baseCmd: baseCmd{_args: args},
}
}
func (cmd *SliceCmd) Val() []interface{} {
return cmd.val
}
func (cmd *SliceCmd) Result() ([]interface{}, error) {
return cmd.val, cmd.err
}
func (cmd *SliceCmd) String() string {
return cmdString(cmd, cmd.val)
}
func (cmd *SliceCmd) readReply(cn *pool.Conn) error {
var v interface{}
v, cmd.err = cn.Rd.ReadArrayReply(sliceParser)
if cmd.err != nil {
return cmd.err
}
cmd.val = v.([]interface{})
return nil
}
//------------------------------------------------------------------------------
type StatusCmd struct {
baseCmd
val string
}
func NewStatusCmd(args ...interface{}) *StatusCmd {
return &StatusCmd{
baseCmd: baseCmd{_args: args},
}
}
func (cmd *StatusCmd) Val() string {
return cmd.val
}
func (cmd *StatusCmd) Result() (string, error) {
return cmd.val, cmd.err
}
func (cmd *StatusCmd) String() string {
return cmdString(cmd, cmd.val)
}
func (cmd *StatusCmd) readReply(cn *pool.Conn) error {
cmd.val, cmd.err = cn.Rd.ReadStringReply()
return cmd.err
}
//------------------------------------------------------------------------------
type IntCmd struct {
baseCmd
val int64
}
func NewIntCmd(args ...interface{}) *IntCmd {
return &IntCmd{
baseCmd: baseCmd{_args: args},
}
}
func (cmd *IntCmd) Val() int64 {
return cmd.val
}
func (cmd *IntCmd) Result() (int64, error) {
return cmd.val, cmd.err
}
func (cmd *IntCmd) String() string {
return cmdString(cmd, cmd.val)
}
func (cmd *IntCmd) readReply(cn *pool.Conn) error {
cmd.val, cmd.err = cn.Rd.ReadIntReply()
return cmd.err
}
//------------------------------------------------------------------------------
type DurationCmd struct {
baseCmd
val time.Duration
precision time.Duration
}
func NewDurationCmd(precision time.Duration, args ...interface{}) *DurationCmd {
return &DurationCmd{
baseCmd: baseCmd{_args: args},
precision: precision,
}
}
func (cmd *DurationCmd) Val() time.Duration {
return cmd.val
}
func (cmd *DurationCmd) Result() (time.Duration, error) {
return cmd.val, cmd.err
}
func (cmd *DurationCmd) String() string {
return cmdString(cmd, cmd.val)
}
func (cmd *DurationCmd) readReply(cn *pool.Conn) error {
var n int64
n, cmd.err = cn.Rd.ReadIntReply()
if cmd.err != nil {
return cmd.err
}
cmd.val = time.Duration(n) * cmd.precision
return nil
}
//------------------------------------------------------------------------------
type TimeCmd struct {
baseCmd
val time.Time
}
func NewTimeCmd(args ...interface{}) *TimeCmd {
return &TimeCmd{
baseCmd: baseCmd{_args: args},
}
}
func (cmd *TimeCmd) Val() time.Time {
return cmd.val
}
func (cmd *TimeCmd) Result() (time.Time, error) {
return cmd.val, cmd.err
}
func (cmd *TimeCmd) String() string {
return cmdString(cmd, cmd.val)
}
func (cmd *TimeCmd) readReply(cn *pool.Conn) error {
var v interface{}
v, cmd.err = cn.Rd.ReadArrayReply(timeParser)
if cmd.err != nil {
return cmd.err
}
cmd.val = v.(time.Time)
return nil
}
//------------------------------------------------------------------------------
type BoolCmd struct {
baseCmd
val bool
}
func NewBoolCmd(args ...interface{}) *BoolCmd {
return &BoolCmd{
baseCmd: baseCmd{_args: args},
}
}
func (cmd *BoolCmd) Val() bool {
return cmd.val
}
func (cmd *BoolCmd) Result() (bool, error) {
return cmd.val, cmd.err
}
func (cmd *BoolCmd) String() string {
return cmdString(cmd, cmd.val)
}
var ok = []byte("OK")
func (cmd *BoolCmd) readReply(cn *pool.Conn) error {
var v interface{}
v, cmd.err = cn.Rd.ReadReply(nil)
// `SET key value NX` returns nil when key already exists. But
// `SETNX key value` returns bool (0/1). So convert nil to bool.
// TODO: is this okay?
if cmd.err == Nil {
cmd.val = false
cmd.err = nil
return nil
}
if cmd.err != nil {
return cmd.err
}
switch v := v.(type) {
case int64:
cmd.val = v == 1
return nil
case []byte:
cmd.val = bytes.Equal(v, ok)
return nil
default:
cmd.err = fmt.Errorf("got %T, wanted int64 or string", v)
return cmd.err
}
}
//------------------------------------------------------------------------------
type StringCmd struct {
baseCmd
val []byte
}
func NewStringCmd(args ...interface{}) *StringCmd {
return &StringCmd{
baseCmd: baseCmd{_args: args},
}
}
func (cmd *StringCmd) Val() string {
return internal.BytesToString(cmd.val)
}
func (cmd *StringCmd) Result() (string, error) {
return cmd.Val(), cmd.err
}
func (cmd *StringCmd) Bytes() ([]byte, error) {
return cmd.val, cmd.err
}
func (cmd *StringCmd) Int64() (int64, error) {
if cmd.err != nil {
return 0, cmd.err
}
return strconv.ParseInt(cmd.Val(), 10, 64)
}
func (cmd *StringCmd) Uint64() (uint64, error) {
if cmd.err != nil {
return 0, cmd.err
}
return strconv.ParseUint(cmd.Val(), 10, 64)
}
func (cmd *StringCmd) Float64() (float64, error) {
if cmd.err != nil {
return 0, cmd.err
}
return strconv.ParseFloat(cmd.Val(), 64)
}
func (cmd *StringCmd) Scan(val interface{}) error {
if cmd.err != nil {
return cmd.err
}
return proto.Scan(cmd.val, val)
}
func (cmd *StringCmd) String() string {
return cmdString(cmd, cmd.val)
}
func (cmd *StringCmd) readReply(cn *pool.Conn) error {
cmd.val, cmd.err = cn.Rd.ReadBytesReply()
return cmd.err
}
//------------------------------------------------------------------------------
type FloatCmd struct {
baseCmd
val float64
}
func NewFloatCmd(args ...interface{}) *FloatCmd {
return &FloatCmd{
baseCmd: baseCmd{_args: args},
}
}
func (cmd *FloatCmd) Val() float64 {
return cmd.val
}
func (cmd *FloatCmd) Result() (float64, error) {
return cmd.Val(), cmd.Err()
}
func (cmd *FloatCmd) String() string {
return cmdString(cmd, cmd.val)
}
func (cmd *FloatCmd) readReply(cn *pool.Conn) error {
cmd.val, cmd.err = cn.Rd.ReadFloatReply()
return cmd.err
}
//------------------------------------------------------------------------------
type StringSliceCmd struct {
baseCmd
val []string
}
func NewStringSliceCmd(args ...interface{}) *StringSliceCmd {
return &StringSliceCmd{
baseCmd: baseCmd{_args: args},
}
}
func (cmd *StringSliceCmd) Val() []string {
return cmd.val
}
func (cmd *StringSliceCmd) Result() ([]string, error) {
return cmd.Val(), cmd.Err()
}
func (cmd *StringSliceCmd) String() string {
return cmdString(cmd, cmd.val)
}
func (cmd *StringSliceCmd) ScanSlice(container interface{}) error {
return proto.ScanSlice(cmd.Val(), container)
}
func (cmd *StringSliceCmd) readReply(cn *pool.Conn) error {
var v interface{}
v, cmd.err = cn.Rd.ReadArrayReply(stringSliceParser)
if cmd.err != nil {
return cmd.err
}
cmd.val = v.([]string)
return nil
}
//------------------------------------------------------------------------------
type BoolSliceCmd struct {
baseCmd
val []bool
}
func NewBoolSliceCmd(args ...interface{}) *BoolSliceCmd {
return &BoolSliceCmd{
baseCmd: baseCmd{_args: args},
}
}
func (cmd *BoolSliceCmd) Val() []bool {
return cmd.val
}
func (cmd *BoolSliceCmd) Result() ([]bool, error) {
return cmd.val, cmd.err
}
func (cmd *BoolSliceCmd) String() string {
return cmdString(cmd, cmd.val)
}
func (cmd *BoolSliceCmd) readReply(cn *pool.Conn) error {
var v interface{}
v, cmd.err = cn.Rd.ReadArrayReply(boolSliceParser)
if cmd.err != nil {
return cmd.err
}
cmd.val = v.([]bool)
return nil
}
//------------------------------------------------------------------------------
type StringStringMapCmd struct {
baseCmd
val map[string]string
}
func NewStringStringMapCmd(args ...interface{}) *StringStringMapCmd {
return &StringStringMapCmd{
baseCmd: baseCmd{_args: args},
}
}
func (cmd *StringStringMapCmd) Val() map[string]string {
return cmd.val
}
func (cmd *StringStringMapCmd) Result() (map[string]string, error) {
return cmd.val, cmd.err
}
func (cmd *StringStringMapCmd) String() string {
return cmdString(cmd, cmd.val)
}
func (cmd *StringStringMapCmd) readReply(cn *pool.Conn) error {
var v interface{}
v, cmd.err = cn.Rd.ReadArrayReply(stringStringMapParser)
if cmd.err != nil {
return cmd.err
}
cmd.val = v.(map[string]string)
return nil
}
//------------------------------------------------------------------------------
type StringIntMapCmd struct {
baseCmd
val map[string]int64
}
func NewStringIntMapCmd(args ...interface{}) *StringIntMapCmd {
return &StringIntMapCmd{
baseCmd: baseCmd{_args: args},
}
}
func (cmd *StringIntMapCmd) Val() map[string]int64 {
return cmd.val
}
func (cmd *StringIntMapCmd) Result() (map[string]int64, error) {
return cmd.val, cmd.err
}
func (cmd *StringIntMapCmd) String() string {
return cmdString(cmd, cmd.val)
}
func (cmd *StringIntMapCmd) readReply(cn *pool.Conn) error {
var v interface{}
v, cmd.err = cn.Rd.ReadArrayReply(stringIntMapParser)
if cmd.err != nil {
return cmd.err
}
cmd.val = v.(map[string]int64)
return nil
}
//------------------------------------------------------------------------------
type ZSliceCmd struct {
baseCmd
val []Z
}
func NewZSliceCmd(args ...interface{}) *ZSliceCmd {
return &ZSliceCmd{
baseCmd: baseCmd{_args: args},
}
}
func (cmd *ZSliceCmd) Val() []Z {
return cmd.val
}
func (cmd *ZSliceCmd) Result() ([]Z, error) {
return cmd.val, cmd.err
}
func (cmd *ZSliceCmd) String() string {
return cmdString(cmd, cmd.val)
}
func (cmd *ZSliceCmd) readReply(cn *pool.Conn) error {
var v interface{}
v, cmd.err = cn.Rd.ReadArrayReply(zSliceParser)
if cmd.err != nil {
return cmd.err
}
cmd.val = v.([]Z)
return nil
}
//------------------------------------------------------------------------------
type ScanCmd struct {
baseCmd
page []string
cursor uint64
process func(cmd Cmder) error
}
func NewScanCmd(process func(cmd Cmder) error, args ...interface{}) *ScanCmd {
return &ScanCmd{
baseCmd: baseCmd{_args: args},
process: process,
}
}
func (cmd *ScanCmd) Val() (keys []string, cursor uint64) {
return cmd.page, cmd.cursor
}
func (cmd *ScanCmd) Result() (keys []string, cursor uint64, err error) {
return cmd.page, cmd.cursor, cmd.err
}
func (cmd *ScanCmd) String() string {
return cmdString(cmd, cmd.page)
}
func (cmd *ScanCmd) readReply(cn *pool.Conn) error {
cmd.page, cmd.cursor, cmd.err = cn.Rd.ReadScanReply()
return cmd.err
}
// Iterator creates a new ScanIterator.
func (cmd *ScanCmd) Iterator() *ScanIterator {
return &ScanIterator{
cmd: cmd,
}
}
//------------------------------------------------------------------------------
type ClusterNode struct {
Id string
Addr string
}
type ClusterSlot struct {
Start int
End int
Nodes []ClusterNode
}
type ClusterSlotsCmd struct {
baseCmd
val []ClusterSlot
}
func NewClusterSlotsCmd(args ...interface{}) *ClusterSlotsCmd {
return &ClusterSlotsCmd{
baseCmd: baseCmd{_args: args},
}
}
func (cmd *ClusterSlotsCmd) Val() []ClusterSlot {
return cmd.val
}
func (cmd *ClusterSlotsCmd) Result() ([]ClusterSlot, error) {
return cmd.Val(), cmd.Err()
}
func (cmd *ClusterSlotsCmd) String() string {
return cmdString(cmd, cmd.val)
}
func (cmd *ClusterSlotsCmd) readReply(cn *pool.Conn) error {
var v interface{}
v, cmd.err = cn.Rd.ReadArrayReply(clusterSlotsParser)
if cmd.err != nil {
return cmd.err
}
cmd.val = v.([]ClusterSlot)
return nil
}
//------------------------------------------------------------------------------
// GeoLocation is used with GeoAdd to add geospatial location.
type GeoLocation struct {
Name string
Longitude, Latitude, Dist float64
GeoHash int64
}
// GeoRadiusQuery is used with GeoRadius to query geospatial index.
type GeoRadiusQuery struct {
Radius float64
// Can be m, km, ft, or mi. Default is km.
Unit string
WithCoord bool
WithDist bool
WithGeoHash bool
Count int
// Can be ASC or DESC. Default is no sort order.
Sort string
}
type GeoLocationCmd struct {
baseCmd
q *GeoRadiusQuery
locations []GeoLocation
}
func NewGeoLocationCmd(q *GeoRadiusQuery, args ...interface{}) *GeoLocationCmd {
args = append(args, q.Radius)
if q.Unit != "" {
args = append(args, q.Unit)
} else {
args = append(args, "km")
}
if q.WithCoord {
args = append(args, "WITHCOORD")
}
if q.WithDist {
args = append(args, "WITHDIST")
}
if q.WithGeoHash {
args = append(args, "WITHHASH")
}
if q.Count > 0 {
args = append(args, "COUNT", q.Count)
}
if q.Sort != "" {
args = append(args, q.Sort)
}
cmd := newBaseCmd(args)
return &GeoLocationCmd{
baseCmd: cmd,
q: q,
}
}
func (cmd *GeoLocationCmd) Val() []GeoLocation {
return cmd.locations
}
func (cmd *GeoLocationCmd) Result() ([]GeoLocation, error) {
return cmd.locations, cmd.err
}
func (cmd *GeoLocationCmd) String() string {
return cmdString(cmd, cmd.locations)
}
func (cmd *GeoLocationCmd) readReply(cn *pool.Conn) error {
var v interface{}
v, cmd.err = cn.Rd.ReadArrayReply(newGeoLocationSliceParser(cmd.q))
if cmd.err != nil {
return cmd.err
}
cmd.locations = v.([]GeoLocation)
return nil
}
//------------------------------------------------------------------------------
type GeoPos struct {
Longitude, Latitude float64
}
type GeoPosCmd struct {
baseCmd
positions []*GeoPos
}
func NewGeoPosCmd(args ...interface{}) *GeoPosCmd {
return &GeoPosCmd{
baseCmd: baseCmd{_args: args},
}
}
func (cmd *GeoPosCmd) Val() []*GeoPos {
return cmd.positions
}
func (cmd *GeoPosCmd) Result() ([]*GeoPos, error) {
return cmd.Val(), cmd.Err()
}
func (cmd *GeoPosCmd) String() string {
return cmdString(cmd, cmd.positions)
}
func (cmd *GeoPosCmd) readReply(cn *pool.Conn) error {
var v interface{}
v, cmd.err = cn.Rd.ReadArrayReply(geoPosSliceParser)
if cmd.err != nil {
return cmd.err
}
cmd.positions = v.([]*GeoPos)
return nil
}
//------------------------------------------------------------------------------
type CommandInfo struct {
Name string
Arity int8
Flags []string
FirstKeyPos int8
LastKeyPos int8
StepCount int8
ReadOnly bool
}
type CommandsInfoCmd struct {
baseCmd
val map[string]*CommandInfo
}
func NewCommandsInfoCmd(args ...interface{}) *CommandsInfoCmd {
return &CommandsInfoCmd{
baseCmd: baseCmd{_args: args},
}
}
func (cmd *CommandsInfoCmd) Val() map[string]*CommandInfo {
return cmd.val
}
func (cmd *CommandsInfoCmd) Result() (map[string]*CommandInfo, error) {
return cmd.Val(), cmd.Err()
}
func (cmd *CommandsInfoCmd) String() string {
return cmdString(cmd, cmd.val)
}
func (cmd *CommandsInfoCmd) readReply(cn *pool.Conn) error {
var v interface{}
v, cmd.err = cn.Rd.ReadArrayReply(commandInfoSliceParser)
if cmd.err != nil {
return cmd.err
}
cmd.val = v.(map[string]*CommandInfo)
return nil
}

2078
go-mysql-redis/vendor/gopkg.in/redis.v5/commands.go generated vendored Normal file

File diff suppressed because it is too large Load diff

4
go-mysql-redis/vendor/gopkg.in/redis.v5/doc.go generated vendored Normal file
View file

@ -0,0 +1,4 @@
/*
Package redis implements a Redis client.
*/
package redis

View file

@ -0,0 +1,81 @@
/*
Copyright 2013 Google Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Package consistenthash provides an implementation of a ring hash.
package consistenthash
import (
"hash/crc32"
"sort"
"strconv"
)
type Hash func(data []byte) uint32
type Map struct {
hash Hash
replicas int
keys []int // Sorted
hashMap map[int]string
}
func New(replicas int, fn Hash) *Map {
m := &Map{
replicas: replicas,
hash: fn,
hashMap: make(map[int]string),
}
if m.hash == nil {
m.hash = crc32.ChecksumIEEE
}
return m
}
// Returns true if there are no items available.
func (m *Map) IsEmpty() bool {
return len(m.keys) == 0
}
// Adds some keys to the hash.
func (m *Map) Add(keys ...string) {
for _, key := range keys {
for i := 0; i < m.replicas; i++ {
hash := int(m.hash([]byte(strconv.Itoa(i) + key)))
m.keys = append(m.keys, hash)
m.hashMap[hash] = key
}
}
sort.Ints(m.keys)
}
// Gets the closest item in the hash to the provided key.
func (m *Map) Get(key string) string {
if m.IsEmpty() {
return ""
}
hash := int(m.hash([]byte(key)))
// Binary search for appropriate replica.
idx := sort.Search(len(m.keys), func(i int) bool { return m.keys[i] >= hash })
// Means we have cycled back to the first replica.
if idx == len(m.keys) {
idx = 0
}
return m.hashMap[m.keys[idx]]
}

View file

@ -0,0 +1,75 @@
package internal
import (
"io"
"net"
"strings"
)
const Nil = RedisError("redis: nil")
type RedisError string
func (e RedisError) Error() string { return string(e) }
func IsRetryableError(err error) bool {
return IsNetworkError(err)
}
func IsInternalError(err error) bool {
_, ok := err.(RedisError)
return ok
}
func IsNetworkError(err error) bool {
if err == io.EOF {
return true
}
_, ok := err.(net.Error)
return ok
}
func IsBadConn(err error, allowTimeout bool) bool {
if err == nil {
return false
}
if IsInternalError(err) {
return false
}
if allowTimeout {
if netErr, ok := err.(net.Error); ok && netErr.Timeout() {
return false
}
}
return true
}
func IsMovedError(err error) (moved bool, ask bool, addr string) {
if !IsInternalError(err) {
return
}
s := err.Error()
if strings.HasPrefix(s, "MOVED ") {
moved = true
} else if strings.HasPrefix(s, "ASK ") {
ask = true
} else {
return
}
ind := strings.LastIndex(s, " ")
if ind == -1 {
return false, false, ""
}
addr = s[ind+1:]
return
}
func IsLoadingError(err error) bool {
return strings.HasPrefix(err.Error(), "LOADING")
}
func IsExecAbortError(err error) bool {
return strings.HasPrefix(err.Error(), "EXECABORT")
}

View file

@ -0,0 +1,73 @@
package hashtag
import (
"math/rand"
"strings"
)
const SlotNumber = 16384
// CRC16 implementation according to CCITT standards.
// Copyright 2001-2010 Georges Menie (www.menie.org)
// Copyright 2013 The Go Authors. All rights reserved.
// http://redis.io/topics/cluster-spec#appendix-a-crc16-reference-implementation-in-ansi-c
var crc16tab = [256]uint16{
0x0000, 0x1021, 0x2042, 0x3063, 0x4084, 0x50a5, 0x60c6, 0x70e7,
0x8108, 0x9129, 0xa14a, 0xb16b, 0xc18c, 0xd1ad, 0xe1ce, 0xf1ef,
0x1231, 0x0210, 0x3273, 0x2252, 0x52b5, 0x4294, 0x72f7, 0x62d6,
0x9339, 0x8318, 0xb37b, 0xa35a, 0xd3bd, 0xc39c, 0xf3ff, 0xe3de,
0x2462, 0x3443, 0x0420, 0x1401, 0x64e6, 0x74c7, 0x44a4, 0x5485,
0xa56a, 0xb54b, 0x8528, 0x9509, 0xe5ee, 0xf5cf, 0xc5ac, 0xd58d,
0x3653, 0x2672, 0x1611, 0x0630, 0x76d7, 0x66f6, 0x5695, 0x46b4,
0xb75b, 0xa77a, 0x9719, 0x8738, 0xf7df, 0xe7fe, 0xd79d, 0xc7bc,
0x48c4, 0x58e5, 0x6886, 0x78a7, 0x0840, 0x1861, 0x2802, 0x3823,
0xc9cc, 0xd9ed, 0xe98e, 0xf9af, 0x8948, 0x9969, 0xa90a, 0xb92b,
0x5af5, 0x4ad4, 0x7ab7, 0x6a96, 0x1a71, 0x0a50, 0x3a33, 0x2a12,
0xdbfd, 0xcbdc, 0xfbbf, 0xeb9e, 0x9b79, 0x8b58, 0xbb3b, 0xab1a,
0x6ca6, 0x7c87, 0x4ce4, 0x5cc5, 0x2c22, 0x3c03, 0x0c60, 0x1c41,
0xedae, 0xfd8f, 0xcdec, 0xddcd, 0xad2a, 0xbd0b, 0x8d68, 0x9d49,
0x7e97, 0x6eb6, 0x5ed5, 0x4ef4, 0x3e13, 0x2e32, 0x1e51, 0x0e70,
0xff9f, 0xefbe, 0xdfdd, 0xcffc, 0xbf1b, 0xaf3a, 0x9f59, 0x8f78,
0x9188, 0x81a9, 0xb1ca, 0xa1eb, 0xd10c, 0xc12d, 0xf14e, 0xe16f,
0x1080, 0x00a1, 0x30c2, 0x20e3, 0x5004, 0x4025, 0x7046, 0x6067,
0x83b9, 0x9398, 0xa3fb, 0xb3da, 0xc33d, 0xd31c, 0xe37f, 0xf35e,
0x02b1, 0x1290, 0x22f3, 0x32d2, 0x4235, 0x5214, 0x6277, 0x7256,
0xb5ea, 0xa5cb, 0x95a8, 0x8589, 0xf56e, 0xe54f, 0xd52c, 0xc50d,
0x34e2, 0x24c3, 0x14a0, 0x0481, 0x7466, 0x6447, 0x5424, 0x4405,
0xa7db, 0xb7fa, 0x8799, 0x97b8, 0xe75f, 0xf77e, 0xc71d, 0xd73c,
0x26d3, 0x36f2, 0x0691, 0x16b0, 0x6657, 0x7676, 0x4615, 0x5634,
0xd94c, 0xc96d, 0xf90e, 0xe92f, 0x99c8, 0x89e9, 0xb98a, 0xa9ab,
0x5844, 0x4865, 0x7806, 0x6827, 0x18c0, 0x08e1, 0x3882, 0x28a3,
0xcb7d, 0xdb5c, 0xeb3f, 0xfb1e, 0x8bf9, 0x9bd8, 0xabbb, 0xbb9a,
0x4a75, 0x5a54, 0x6a37, 0x7a16, 0x0af1, 0x1ad0, 0x2ab3, 0x3a92,
0xfd2e, 0xed0f, 0xdd6c, 0xcd4d, 0xbdaa, 0xad8b, 0x9de8, 0x8dc9,
0x7c26, 0x6c07, 0x5c64, 0x4c45, 0x3ca2, 0x2c83, 0x1ce0, 0x0cc1,
0xef1f, 0xff3e, 0xcf5d, 0xdf7c, 0xaf9b, 0xbfba, 0x8fd9, 0x9ff8,
0x6e17, 0x7e36, 0x4e55, 0x5e74, 0x2e93, 0x3eb2, 0x0ed1, 0x1ef0,
}
func Key(key string) string {
if s := strings.IndexByte(key, '{'); s > -1 {
if e := strings.IndexByte(key[s+1:], '}'); e > 0 {
return key[s+1 : s+e+1]
}
}
return key
}
// hashSlot returns a consistent slot number between 0 and 16383
// for any given string key.
func Slot(key string) int {
key = Key(key)
if key == "" {
return rand.Intn(SlotNumber)
}
return int(crc16sum(key)) % SlotNumber
}
func crc16sum(key string) (crc uint16) {
for i := 0; i < len(key); i++ {
crc = (crc << 8) ^ crc16tab[(byte(crc>>8)^key[i])&0x00ff]
}
return
}

View file

@ -0,0 +1,15 @@
package internal
import (
"fmt"
"log"
)
var Logger *log.Logger
func Logf(s string, args ...interface{}) {
if Logger == nil {
return
}
Logger.Output(2, fmt.Sprintf(s, args...))
}

View file

@ -0,0 +1,78 @@
package pool
import (
"net"
"sync/atomic"
"time"
"gopkg.in/redis.v5/internal/proto"
)
var noDeadline = time.Time{}
type Conn struct {
netConn net.Conn
Rd *proto.Reader
Wb *proto.WriteBuffer
Inited bool
usedAt atomic.Value
}
func NewConn(netConn net.Conn) *Conn {
cn := &Conn{
netConn: netConn,
Wb: proto.NewWriteBuffer(),
}
cn.Rd = proto.NewReader(cn.netConn)
cn.SetUsedAt(time.Now())
return cn
}
func (cn *Conn) UsedAt() time.Time {
return cn.usedAt.Load().(time.Time)
}
func (cn *Conn) SetUsedAt(tm time.Time) {
cn.usedAt.Store(tm)
}
func (cn *Conn) SetNetConn(netConn net.Conn) {
cn.netConn = netConn
cn.Rd.Reset(netConn)
}
func (cn *Conn) IsStale(timeout time.Duration) bool {
return timeout > 0 && time.Since(cn.UsedAt()) > timeout
}
func (cn *Conn) SetReadTimeout(timeout time.Duration) error {
now := time.Now()
cn.SetUsedAt(now)
if timeout > 0 {
return cn.netConn.SetReadDeadline(now.Add(timeout))
}
return cn.netConn.SetReadDeadline(noDeadline)
}
func (cn *Conn) SetWriteTimeout(timeout time.Duration) error {
now := time.Now()
cn.SetUsedAt(now)
if timeout > 0 {
return cn.netConn.SetWriteDeadline(now.Add(timeout))
}
return cn.netConn.SetWriteDeadline(noDeadline)
}
func (cn *Conn) Write(b []byte) (int, error) {
return cn.netConn.Write(b)
}
func (cn *Conn) RemoteAddr() net.Addr {
return cn.netConn.RemoteAddr()
}
func (cn *Conn) Close() error {
return cn.netConn.Close()
}

View file

@ -0,0 +1,354 @@
package pool
import (
"errors"
"fmt"
"net"
"sync"
"sync/atomic"
"time"
"gopkg.in/redis.v5/internal"
)
var (
ErrClosed = errors.New("redis: client is closed")
ErrPoolTimeout = errors.New("redis: connection pool timeout")
errConnStale = errors.New("connection is stale")
)
var timers = sync.Pool{
New: func() interface{} {
t := time.NewTimer(time.Hour)
t.Stop()
return t
},
}
// Stats contains pool state information and accumulated stats.
type Stats struct {
Requests uint32 // number of times a connection was requested by the pool
Hits uint32 // number of times free connection was found in the pool
Timeouts uint32 // number of times a wait timeout occurred
TotalConns uint32 // the number of total connections in the pool
FreeConns uint32 // the number of free connections in the pool
}
type Pooler interface {
Get() (*Conn, bool, error)
Put(*Conn) error
Remove(*Conn, error) error
Len() int
FreeLen() int
Stats() *Stats
Close() error
}
type dialer func() (net.Conn, error)
type ConnPool struct {
dial dialer
OnClose func(*Conn) error
poolTimeout time.Duration
idleTimeout time.Duration
queue chan struct{}
connsMu sync.Mutex
conns []*Conn
freeConnsMu sync.Mutex
freeConns []*Conn
stats Stats
_closed int32 // atomic
lastErr atomic.Value
}
var _ Pooler = (*ConnPool)(nil)
func NewConnPool(dial dialer, poolSize int, poolTimeout, idleTimeout, idleCheckFrequency time.Duration) *ConnPool {
p := &ConnPool{
dial: dial,
poolTimeout: poolTimeout,
idleTimeout: idleTimeout,
queue: make(chan struct{}, poolSize),
conns: make([]*Conn, 0, poolSize),
freeConns: make([]*Conn, 0, poolSize),
}
if idleTimeout > 0 && idleCheckFrequency > 0 {
go p.reaper(idleCheckFrequency)
}
return p
}
func (p *ConnPool) NewConn() (*Conn, error) {
netConn, err := p.dial()
if err != nil {
return nil, err
}
return NewConn(netConn), nil
}
func (p *ConnPool) PopFree() *Conn {
timer := timers.Get().(*time.Timer)
timer.Reset(p.poolTimeout)
select {
case p.queue <- struct{}{}:
if !timer.Stop() {
<-timer.C
}
timers.Put(timer)
case <-timer.C:
timers.Put(timer)
atomic.AddUint32(&p.stats.Timeouts, 1)
return nil
}
p.freeConnsMu.Lock()
cn := p.popFree()
p.freeConnsMu.Unlock()
if cn == nil {
<-p.queue
}
return cn
}
func (p *ConnPool) popFree() *Conn {
if len(p.freeConns) == 0 {
return nil
}
idx := len(p.freeConns) - 1
cn := p.freeConns[idx]
p.freeConns = p.freeConns[:idx]
return cn
}
// Get returns existed connection from the pool or creates a new one.
func (p *ConnPool) Get() (*Conn, bool, error) {
if p.closed() {
return nil, false, ErrClosed
}
atomic.AddUint32(&p.stats.Requests, 1)
timer := timers.Get().(*time.Timer)
timer.Reset(p.poolTimeout)
select {
case p.queue <- struct{}{}:
if !timer.Stop() {
<-timer.C
}
timers.Put(timer)
case <-timer.C:
timers.Put(timer)
atomic.AddUint32(&p.stats.Timeouts, 1)
return nil, false, ErrPoolTimeout
}
for {
p.freeConnsMu.Lock()
cn := p.popFree()
p.freeConnsMu.Unlock()
if cn == nil {
break
}
if cn.IsStale(p.idleTimeout) {
p.remove(cn, errConnStale)
continue
}
atomic.AddUint32(&p.stats.Hits, 1)
return cn, false, nil
}
newcn, err := p.NewConn()
if err != nil {
<-p.queue
return nil, false, err
}
p.connsMu.Lock()
p.conns = append(p.conns, newcn)
p.connsMu.Unlock()
return newcn, true, nil
}
func (p *ConnPool) Put(cn *Conn) error {
if data := cn.Rd.PeekBuffered(); data != nil {
err := fmt.Errorf("connection has unread data: %q", data)
internal.Logf(err.Error())
return p.Remove(cn, err)
}
p.freeConnsMu.Lock()
p.freeConns = append(p.freeConns, cn)
p.freeConnsMu.Unlock()
<-p.queue
return nil
}
func (p *ConnPool) Remove(cn *Conn, reason error) error {
p.remove(cn, reason)
<-p.queue
return nil
}
func (p *ConnPool) remove(cn *Conn, reason error) {
_ = p.closeConn(cn, reason)
p.connsMu.Lock()
for i, c := range p.conns {
if c == cn {
p.conns = append(p.conns[:i], p.conns[i+1:]...)
break
}
}
p.connsMu.Unlock()
}
// Len returns total number of connections.
func (p *ConnPool) Len() int {
p.connsMu.Lock()
l := len(p.conns)
p.connsMu.Unlock()
return l
}
// FreeLen returns number of free connections.
func (p *ConnPool) FreeLen() int {
p.freeConnsMu.Lock()
l := len(p.freeConns)
p.freeConnsMu.Unlock()
return l
}
func (p *ConnPool) Stats() *Stats {
return &Stats{
Requests: atomic.LoadUint32(&p.stats.Requests),
Hits: atomic.LoadUint32(&p.stats.Hits),
Timeouts: atomic.LoadUint32(&p.stats.Timeouts),
TotalConns: uint32(p.Len()),
FreeConns: uint32(p.FreeLen()),
}
}
func (p *ConnPool) closed() bool {
return atomic.LoadInt32(&p._closed) == 1
}
func (p *ConnPool) Close() error {
if !atomic.CompareAndSwapInt32(&p._closed, 0, 1) {
return ErrClosed
}
p.connsMu.Lock()
var firstErr error
for _, cn := range p.conns {
if cn == nil {
continue
}
if err := p.closeConn(cn, ErrClosed); err != nil && firstErr == nil {
firstErr = err
}
}
p.conns = nil
p.connsMu.Unlock()
p.freeConnsMu.Lock()
p.freeConns = nil
p.freeConnsMu.Unlock()
return firstErr
}
func (p *ConnPool) closeConn(cn *Conn, reason error) error {
if p.OnClose != nil {
_ = p.OnClose(cn)
}
return cn.Close()
}
func (p *ConnPool) reapStaleConn() bool {
if len(p.freeConns) == 0 {
return false
}
cn := p.freeConns[0]
if !cn.IsStale(p.idleTimeout) {
return false
}
p.remove(cn, errConnStale)
p.freeConns = append(p.freeConns[:0], p.freeConns[1:]...)
return true
}
func (p *ConnPool) ReapStaleConns() (int, error) {
var n int
for {
p.queue <- struct{}{}
p.freeConnsMu.Lock()
reaped := p.reapStaleConn()
p.freeConnsMu.Unlock()
<-p.queue
if reaped {
n++
} else {
break
}
}
return n, nil
}
func (p *ConnPool) reaper(frequency time.Duration) {
ticker := time.NewTicker(frequency)
defer ticker.Stop()
for _ = range ticker.C {
if p.closed() {
break
}
n, err := p.ReapStaleConns()
if err != nil {
internal.Logf("ReapStaleConns failed: %s", err)
continue
}
s := p.Stats()
internal.Logf(
"reaper: removed %d stale conns (TotalConns=%d FreeConns=%d Requests=%d Hits=%d Timeouts=%d)",
n, s.TotalConns, s.FreeConns, s.Requests, s.Hits, s.Timeouts,
)
}
}
//------------------------------------------------------------------------------
var idleCheckFrequency atomic.Value
func SetIdleCheckFrequency(d time.Duration) {
idleCheckFrequency.Store(d)
}
func getIdleCheckFrequency() time.Duration {
v := idleCheckFrequency.Load()
if v == nil {
return time.Minute
}
return v.(time.Duration)
}

View file

@ -0,0 +1,47 @@
package pool
type SingleConnPool struct {
cn *Conn
}
var _ Pooler = (*SingleConnPool)(nil)
func NewSingleConnPool(cn *Conn) *SingleConnPool {
return &SingleConnPool{
cn: cn,
}
}
func (p *SingleConnPool) Get() (*Conn, bool, error) {
return p.cn, false, nil
}
func (p *SingleConnPool) Put(cn *Conn) error {
if p.cn != cn {
panic("p.cn != cn")
}
return nil
}
func (p *SingleConnPool) Remove(cn *Conn, _ error) error {
if p.cn != cn {
panic("p.cn != cn")
}
return nil
}
func (p *SingleConnPool) Len() int {
return 1
}
func (p *SingleConnPool) FreeLen() int {
return 0
}
func (p *SingleConnPool) Stats() *Stats {
return nil
}
func (p *SingleConnPool) Close() error {
return nil
}

View file

@ -0,0 +1,119 @@
package pool
import (
"errors"
"sync"
)
type StickyConnPool struct {
pool *ConnPool
reusable bool
cn *Conn
closed bool
mu sync.Mutex
}
var _ Pooler = (*StickyConnPool)(nil)
func NewStickyConnPool(pool *ConnPool, reusable bool) *StickyConnPool {
return &StickyConnPool{
pool: pool,
reusable: reusable,
}
}
func (p *StickyConnPool) Get() (*Conn, bool, error) {
p.mu.Lock()
defer p.mu.Unlock()
if p.closed {
return nil, false, ErrClosed
}
if p.cn != nil {
return p.cn, false, nil
}
cn, _, err := p.pool.Get()
if err != nil {
return nil, false, err
}
p.cn = cn
return cn, true, nil
}
func (p *StickyConnPool) putUpstream() (err error) {
err = p.pool.Put(p.cn)
p.cn = nil
return err
}
func (p *StickyConnPool) Put(cn *Conn) error {
p.mu.Lock()
defer p.mu.Unlock()
if p.closed {
return ErrClosed
}
return nil
}
func (p *StickyConnPool) removeUpstream(reason error) error {
err := p.pool.Remove(p.cn, reason)
p.cn = nil
return err
}
func (p *StickyConnPool) Remove(cn *Conn, reason error) error {
p.mu.Lock()
defer p.mu.Unlock()
if p.closed {
return nil
}
return p.removeUpstream(reason)
}
func (p *StickyConnPool) Len() int {
p.mu.Lock()
defer p.mu.Unlock()
if p.cn == nil {
return 0
}
return 1
}
func (p *StickyConnPool) FreeLen() int {
p.mu.Lock()
defer p.mu.Unlock()
if p.cn == nil {
return 1
}
return 0
}
func (p *StickyConnPool) Stats() *Stats {
return nil
}
func (p *StickyConnPool) Close() error {
p.mu.Lock()
defer p.mu.Unlock()
if p.closed {
return ErrClosed
}
p.closed = true
var err error
if p.cn != nil {
if p.reusable {
err = p.putUpstream()
} else {
reason := errors.New("redis: unreusable sticky connection")
err = p.removeUpstream(reason)
}
}
return err
}

View file

@ -0,0 +1,334 @@
package proto
import (
"bufio"
"fmt"
"io"
"strconv"
"gopkg.in/redis.v5/internal"
)
const bytesAllocLimit = 1024 * 1024 // 1mb
const (
ErrorReply = '-'
StatusReply = '+'
IntReply = ':'
StringReply = '$'
ArrayReply = '*'
)
type MultiBulkParse func(*Reader, int64) (interface{}, error)
type Reader struct {
src *bufio.Reader
buf []byte
}
func NewReader(rd io.Reader) *Reader {
return &Reader{
src: bufio.NewReader(rd),
buf: make([]byte, 4096),
}
}
func (r *Reader) Reset(rd io.Reader) {
r.src.Reset(rd)
}
func (p *Reader) PeekBuffered() []byte {
if n := p.src.Buffered(); n != 0 {
b, _ := p.src.Peek(n)
return b
}
return nil
}
func (p *Reader) ReadN(n int) ([]byte, error) {
b, err := readN(p.src, p.buf, n)
if err != nil {
return nil, err
}
p.buf = b
return b, nil
}
func (p *Reader) ReadLine() ([]byte, error) {
line, isPrefix, err := p.src.ReadLine()
if err != nil {
return nil, err
}
if isPrefix {
return nil, bufio.ErrBufferFull
}
if len(line) == 0 {
return nil, internal.RedisError("redis: reply is empty")
}
if isNilReply(line) {
return nil, internal.Nil
}
return line, nil
}
func (p *Reader) ReadReply(m MultiBulkParse) (interface{}, error) {
line, err := p.ReadLine()
if err != nil {
return nil, err
}
switch line[0] {
case ErrorReply:
return nil, ParseErrorReply(line)
case StatusReply:
return parseStatusValue(line), nil
case IntReply:
return parseInt(line[1:], 10, 64)
case StringReply:
return p.readTmpBytesValue(line)
case ArrayReply:
n, err := parseArrayLen(line)
if err != nil {
return nil, err
}
return m(p, n)
}
return nil, fmt.Errorf("redis: can't parse %.100q", line)
}
func (p *Reader) ReadIntReply() (int64, error) {
line, err := p.ReadLine()
if err != nil {
return 0, err
}
switch line[0] {
case ErrorReply:
return 0, ParseErrorReply(line)
case IntReply:
return parseInt(line[1:], 10, 64)
default:
return 0, fmt.Errorf("redis: can't parse int reply: %.100q", line)
}
}
func (p *Reader) ReadTmpBytesReply() ([]byte, error) {
line, err := p.ReadLine()
if err != nil {
return nil, err
}
switch line[0] {
case ErrorReply:
return nil, ParseErrorReply(line)
case StringReply:
return p.readTmpBytesValue(line)
case StatusReply:
return parseStatusValue(line), nil
default:
return nil, fmt.Errorf("redis: can't parse string reply: %.100q", line)
}
}
func (r *Reader) ReadBytesReply() ([]byte, error) {
b, err := r.ReadTmpBytesReply()
if err != nil {
return nil, err
}
cp := make([]byte, len(b))
copy(cp, b)
return cp, nil
}
func (p *Reader) ReadStringReply() (string, error) {
b, err := p.ReadTmpBytesReply()
if err != nil {
return "", err
}
return string(b), nil
}
func (p *Reader) ReadFloatReply() (float64, error) {
b, err := p.ReadTmpBytesReply()
if err != nil {
return 0, err
}
return parseFloat(b, 64)
}
func (p *Reader) ReadArrayReply(m MultiBulkParse) (interface{}, error) {
line, err := p.ReadLine()
if err != nil {
return nil, err
}
switch line[0] {
case ErrorReply:
return nil, ParseErrorReply(line)
case ArrayReply:
n, err := parseArrayLen(line)
if err != nil {
return nil, err
}
return m(p, n)
default:
return nil, fmt.Errorf("redis: can't parse array reply: %.100q", line)
}
}
func (p *Reader) ReadArrayLen() (int64, error) {
line, err := p.ReadLine()
if err != nil {
return 0, err
}
switch line[0] {
case ErrorReply:
return 0, ParseErrorReply(line)
case ArrayReply:
return parseArrayLen(line)
default:
return 0, fmt.Errorf("redis: can't parse array reply: %.100q", line)
}
}
func (p *Reader) ReadScanReply() ([]string, uint64, error) {
n, err := p.ReadArrayLen()
if err != nil {
return nil, 0, err
}
if n != 2 {
return nil, 0, fmt.Errorf("redis: got %d elements in scan reply, expected 2", n)
}
cursor, err := p.ReadUint()
if err != nil {
return nil, 0, err
}
n, err = p.ReadArrayLen()
if err != nil {
return nil, 0, err
}
keys := make([]string, n)
for i := int64(0); i < n; i++ {
key, err := p.ReadStringReply()
if err != nil {
return nil, 0, err
}
keys[i] = key
}
return keys, cursor, err
}
func (p *Reader) readTmpBytesValue(line []byte) ([]byte, error) {
if isNilReply(line) {
return nil, internal.Nil
}
replyLen, err := strconv.Atoi(string(line[1:]))
if err != nil {
return nil, err
}
b, err := p.ReadN(replyLen + 2)
if err != nil {
return nil, err
}
return b[:replyLen], nil
}
func (r *Reader) ReadInt() (int64, error) {
b, err := r.ReadTmpBytesReply()
if err != nil {
return 0, err
}
return parseInt(b, 10, 64)
}
func (r *Reader) ReadUint() (uint64, error) {
b, err := r.ReadTmpBytesReply()
if err != nil {
return 0, err
}
return parseUint(b, 10, 64)
}
// --------------------------------------------------------------------
func readN(r io.Reader, b []byte, n int) ([]byte, error) {
if n == 0 && b == nil {
return make([]byte, 0), nil
}
if cap(b) >= n {
b = b[:n]
_, err := io.ReadFull(r, b)
return b, err
}
b = b[:cap(b)]
pos := 0
for pos < n {
diff := n - len(b)
if diff > bytesAllocLimit {
diff = bytesAllocLimit
}
b = append(b, make([]byte, diff)...)
nn, err := io.ReadFull(r, b[pos:])
if err != nil {
return nil, err
}
pos += nn
}
return b, nil
}
func formatInt(n int64) string {
return strconv.FormatInt(n, 10)
}
func formatUint(u uint64) string {
return strconv.FormatUint(u, 10)
}
func formatFloat(f float64) string {
return strconv.FormatFloat(f, 'f', -1, 64)
}
func isNilReply(b []byte) bool {
return len(b) == 3 &&
(b[0] == StringReply || b[0] == ArrayReply) &&
b[1] == '-' && b[2] == '1'
}
func ParseErrorReply(line []byte) error {
return internal.RedisError(string(line[1:]))
}
func parseStatusValue(line []byte) []byte {
return line[1:]
}
func parseArrayLen(line []byte) (int64, error) {
if isNilReply(line) {
return 0, internal.Nil
}
return parseInt(line[1:], 10, 64)
}
func atoi(b []byte) (int, error) {
return strconv.Atoi(internal.BytesToString(b))
}
func parseInt(b []byte, base int, bitSize int) (int64, error) {
return strconv.ParseInt(internal.BytesToString(b), base, bitSize)
}
func parseUint(b []byte, base int, bitSize int) (uint64, error) {
return strconv.ParseUint(internal.BytesToString(b), base, bitSize)
}
func parseFloat(b []byte, bitSize int) (float64, error) {
return strconv.ParseFloat(internal.BytesToString(b), bitSize)
}

View file

@ -0,0 +1,131 @@
package proto
import (
"encoding"
"fmt"
"reflect"
"gopkg.in/redis.v5/internal"
)
func Scan(b []byte, v interface{}) error {
switch v := v.(type) {
case nil:
return internal.RedisError("redis: Scan(nil)")
case *string:
*v = internal.BytesToString(b)
return nil
case *[]byte:
*v = b
return nil
case *int:
var err error
*v, err = atoi(b)
return err
case *int8:
n, err := parseInt(b, 10, 8)
if err != nil {
return err
}
*v = int8(n)
return nil
case *int16:
n, err := parseInt(b, 10, 16)
if err != nil {
return err
}
*v = int16(n)
return nil
case *int32:
n, err := parseInt(b, 10, 32)
if err != nil {
return err
}
*v = int32(n)
return nil
case *int64:
n, err := parseInt(b, 10, 64)
if err != nil {
return err
}
*v = n
return nil
case *uint:
n, err := parseUint(b, 10, 64)
if err != nil {
return err
}
*v = uint(n)
return nil
case *uint8:
n, err := parseUint(b, 10, 8)
if err != nil {
return err
}
*v = uint8(n)
return nil
case *uint16:
n, err := parseUint(b, 10, 16)
if err != nil {
return err
}
*v = uint16(n)
return nil
case *uint32:
n, err := parseUint(b, 10, 32)
if err != nil {
return err
}
*v = uint32(n)
return nil
case *uint64:
n, err := parseUint(b, 10, 64)
if err != nil {
return err
}
*v = n
return nil
case *float32:
n, err := parseFloat(b, 32)
if err != nil {
return err
}
*v = float32(n)
return err
case *float64:
var err error
*v, err = parseFloat(b, 64)
return err
case *bool:
*v = len(b) == 1 && b[0] == '1'
return nil
case encoding.BinaryUnmarshaler:
return v.UnmarshalBinary(b)
default:
return fmt.Errorf(
"redis: can't unmarshal %T (consider implementing BinaryUnmarshaler)", v)
}
}
func ScanSlice(data []string, slice interface{}) error {
v := reflect.ValueOf(slice)
if !v.IsValid() {
return fmt.Errorf("redis: ScanSlice(nil)")
}
if v.Kind() != reflect.Ptr {
return fmt.Errorf("redis: ScanSlice(non-pointer %T)", slice)
}
v = v.Elem()
if v.Kind() != reflect.Slice {
return fmt.Errorf("redis: ScanSlice(non-slice %T)", slice)
}
for i, s := range data {
elem := internal.SliceNextElem(v)
if err := Scan([]byte(s), elem.Addr().Interface()); err != nil {
return fmt.Errorf("redis: ScanSlice(index=%d value=%q) failed: %s", i, s, err)
}
}
return nil
}

View file

@ -0,0 +1,105 @@
package proto
import (
"encoding"
"fmt"
"strconv"
)
const bufferSize = 4096
type WriteBuffer struct {
b []byte
}
func NewWriteBuffer() *WriteBuffer {
return &WriteBuffer{
b: make([]byte, 0, 4096),
}
}
func (w *WriteBuffer) Len() int { return len(w.b) }
func (w *WriteBuffer) Bytes() []byte { return w.b }
func (w *WriteBuffer) Reset() { w.b = w.b[:0] }
func (w *WriteBuffer) Append(args []interface{}) error {
w.b = append(w.b, ArrayReply)
w.b = strconv.AppendUint(w.b, uint64(len(args)), 10)
w.b = append(w.b, '\r', '\n')
for _, arg := range args {
if err := w.append(arg); err != nil {
return err
}
}
return nil
}
func (w *WriteBuffer) append(val interface{}) error {
switch v := val.(type) {
case nil:
w.AppendString("")
case string:
w.AppendString(v)
case []byte:
w.AppendBytes(v)
case int:
w.AppendString(formatInt(int64(v)))
case int8:
w.AppendString(formatInt(int64(v)))
case int16:
w.AppendString(formatInt(int64(v)))
case int32:
w.AppendString(formatInt(int64(v)))
case int64:
w.AppendString(formatInt(v))
case uint:
w.AppendString(formatUint(uint64(v)))
case uint8:
w.AppendString(formatUint(uint64(v)))
case uint16:
w.AppendString(formatUint(uint64(v)))
case uint32:
w.AppendString(formatUint(uint64(v)))
case uint64:
w.AppendString(formatUint(v))
case float32:
w.AppendString(formatFloat(float64(v)))
case float64:
w.AppendString(formatFloat(v))
case bool:
if v {
w.AppendString("1")
} else {
w.AppendString("0")
}
default:
if bm, ok := val.(encoding.BinaryMarshaler); ok {
bb, err := bm.MarshalBinary()
if err != nil {
return err
}
w.AppendBytes(bb)
} else {
return fmt.Errorf(
"redis: can't marshal %T (consider implementing encoding.BinaryMarshaler)", val)
}
}
return nil
}
func (w *WriteBuffer) AppendString(s string) {
w.b = append(w.b, StringReply)
w.b = strconv.AppendUint(w.b, uint64(len(s)), 10)
w.b = append(w.b, '\r', '\n')
w.b = append(w.b, s...)
w.b = append(w.b, '\r', '\n')
}
func (w *WriteBuffer) AppendBytes(p []byte) {
w.b = append(w.b, StringReply)
w.b = strconv.AppendUint(w.b, uint64(len(p)), 10)
w.b = append(w.b, '\r', '\n')
w.b = append(w.b, p...)
w.b = append(w.b, '\r', '\n')
}

View file

@ -0,0 +1,7 @@
// +build appengine
package internal
func BytesToString(b []byte) string {
return string(b)
}

View file

@ -0,0 +1,14 @@
// +build !appengine
package internal
import (
"reflect"
"unsafe"
)
func BytesToString(b []byte) string {
bytesHeader := (*reflect.SliceHeader)(unsafe.Pointer(&b))
strHeader := reflect.StringHeader{bytesHeader.Data, bytesHeader.Len}
return *(*string)(unsafe.Pointer(&strHeader))
}

View file

@ -0,0 +1,47 @@
package internal
import "reflect"
func ToLower(s string) string {
if isLower(s) {
return s
}
b := make([]byte, len(s))
for i := range b {
c := s[i]
if c >= 'A' && c <= 'Z' {
c += 'a' - 'A'
}
b[i] = c
}
return BytesToString(b)
}
func isLower(s string) bool {
for i := 0; i < len(s); i++ {
c := s[i]
if c >= 'A' && c <= 'Z' {
return false
}
}
return true
}
func SliceNextElem(v reflect.Value) reflect.Value {
if v.Len() < v.Cap() {
v.Set(v.Slice(0, v.Len()+1))
return v.Index(v.Len() - 1)
}
elemType := v.Type().Elem()
if elemType.Kind() == reflect.Ptr {
elem := reflect.New(elemType.Elem())
v.Set(reflect.Append(v, elem))
return elem.Elem()
}
v.Set(reflect.Append(v, reflect.Zero(elemType)))
return v.Index(v.Len() - 1)
}

73
go-mysql-redis/vendor/gopkg.in/redis.v5/iterator.go generated vendored Normal file
View file

@ -0,0 +1,73 @@
package redis
import "sync"
// ScanIterator is used to incrementally iterate over a collection of elements.
// It's safe for concurrent use by multiple goroutines.
type ScanIterator struct {
mu sync.Mutex // protects Scanner and pos
cmd *ScanCmd
pos int
}
// Err returns the last iterator error, if any.
func (it *ScanIterator) Err() error {
it.mu.Lock()
err := it.cmd.Err()
it.mu.Unlock()
return err
}
// Next advances the cursor and returns true if more values can be read.
func (it *ScanIterator) Next() bool {
it.mu.Lock()
defer it.mu.Unlock()
// Instantly return on errors.
if it.cmd.Err() != nil {
return false
}
// Advance cursor, check if we are still within range.
if it.pos < len(it.cmd.page) {
it.pos++
return true
}
for {
// Return if there is no more data to fetch.
if it.cmd.cursor == 0 {
return false
}
// Fetch next page.
if it.cmd._args[0] == "scan" {
it.cmd._args[1] = it.cmd.cursor
} else {
it.cmd._args[2] = it.cmd.cursor
}
err := it.cmd.process(it.cmd)
if err != nil {
return false
}
it.pos = 1
// Redis can occasionally return empty page.
if len(it.cmd.page) > 0 {
return true
}
}
}
// Val returns the key/field at the current cursor position.
func (it *ScanIterator) Val() string {
var v string
it.mu.Lock()
if it.cmd.Err() == nil && it.pos > 0 && it.pos <= len(it.cmd.page) {
v = it.cmd.page[it.pos-1]
}
it.mu.Unlock()
return v
}

185
go-mysql-redis/vendor/gopkg.in/redis.v5/options.go generated vendored Normal file
View file

@ -0,0 +1,185 @@
package redis
import (
"crypto/tls"
"errors"
"fmt"
"net"
"net/url"
"strconv"
"strings"
"time"
"gopkg.in/redis.v5/internal/pool"
)
type Options struct {
// The network type, either tcp or unix.
// Default is tcp.
Network string
// host:port address.
Addr string
// Dialer creates new network connection and has priority over
// Network and Addr options.
Dialer func() (net.Conn, error)
// Optional password. Must match the password specified in the
// requirepass server configuration option.
Password string
// Database to be selected after connecting to the server.
DB int
// Maximum number of retries before giving up.
// Default is to not retry failed commands.
MaxRetries int
// Dial timeout for establishing new connections.
// Default is 5 seconds.
DialTimeout time.Duration
// Timeout for socket reads. If reached, commands will fail
// with a timeout instead of blocking.
// Default is 3 seconds.
ReadTimeout time.Duration
// Timeout for socket writes. If reached, commands will fail
// with a timeout instead of blocking.
// Default is 3 seconds.
WriteTimeout time.Duration
// Maximum number of socket connections.
// Default is 10 connections.
PoolSize int
// Amount of time client waits for connection if all connections
// are busy before returning an error.
// Default is ReadTimeout + 1 second.
PoolTimeout time.Duration
// Amount of time after which client closes idle connections.
// Should be less than server's timeout.
// Default is to not close idle connections.
IdleTimeout time.Duration
// Frequency of idle checks.
// Default is 1 minute.
// When minus value is set, then idle check is disabled.
IdleCheckFrequency time.Duration
// Enables read only queries on slave nodes.
ReadOnly bool
// TLS Config to use. When set TLS will be negotiated.
TLSConfig *tls.Config
}
func (opt *Options) init() {
if opt.Network == "" {
opt.Network = "tcp"
}
if opt.Dialer == nil {
opt.Dialer = func() (net.Conn, error) {
conn, err := net.DialTimeout(opt.Network, opt.Addr, opt.DialTimeout)
if opt.TLSConfig == nil || err != nil {
return conn, err
}
t := tls.Client(conn, opt.TLSConfig)
return t, t.Handshake()
}
}
if opt.PoolSize == 0 {
opt.PoolSize = 10
}
if opt.DialTimeout == 0 {
opt.DialTimeout = 5 * time.Second
}
if opt.ReadTimeout == 0 {
opt.ReadTimeout = 3 * time.Second
} else if opt.ReadTimeout == -1 {
opt.ReadTimeout = 0
}
if opt.WriteTimeout == 0 {
opt.WriteTimeout = opt.ReadTimeout
} else if opt.WriteTimeout == -1 {
opt.WriteTimeout = 0
}
if opt.PoolTimeout == 0 {
opt.PoolTimeout = opt.ReadTimeout + time.Second
}
if opt.IdleTimeout == 0 {
opt.IdleTimeout = 5 * time.Minute
}
if opt.IdleCheckFrequency == 0 {
opt.IdleCheckFrequency = time.Minute
}
}
// ParseURL parses a redis URL into options that can be used to connect to redis
func ParseURL(redisURL string) (*Options, error) {
o := &Options{Network: "tcp"}
u, err := url.Parse(redisURL)
if err != nil {
return nil, err
}
if u.Scheme != "redis" && u.Scheme != "rediss" {
return nil, errors.New("invalid redis URL scheme: " + u.Scheme)
}
if u.User != nil {
if p, ok := u.User.Password(); ok {
o.Password = p
}
}
if len(u.Query()) > 0 {
return nil, errors.New("no options supported")
}
h, p, err := net.SplitHostPort(u.Host)
if err != nil {
h = u.Host
}
if h == "" {
h = "localhost"
}
if p == "" {
p = "6379"
}
o.Addr = net.JoinHostPort(h, p)
f := strings.FieldsFunc(u.Path, func(r rune) bool {
return r == '/'
})
switch len(f) {
case 0:
o.DB = 0
case 1:
if o.DB, err = strconv.Atoi(f[0]); err != nil {
return nil, fmt.Errorf("invalid redis database number: %q", f[0])
}
default:
return nil, errors.New("invalid redis URL path: " + u.Path)
}
if u.Scheme == "rediss" {
o.TLSConfig = &tls.Config{ServerName: h}
}
return o, nil
}
func newConnPool(opt *Options) *pool.ConnPool {
return pool.NewConnPool(
opt.Dialer,
opt.PoolSize,
opt.PoolTimeout,
opt.IdleTimeout,
opt.IdleCheckFrequency,
)
}
// PoolStats contains pool state information and accumulated stats.
type PoolStats struct {
Requests uint32 // number of times a connection was requested by the pool
Hits uint32 // number of times free connection was found in the pool
Timeouts uint32 // number of times a wait timeout occurred
TotalConns uint32 // the number of total connections in the pool
FreeConns uint32 // the number of free connections in the pool
}

400
go-mysql-redis/vendor/gopkg.in/redis.v5/parser.go generated vendored Normal file
View file

@ -0,0 +1,400 @@
package redis
import (
"fmt"
"net"
"strconv"
"time"
"gopkg.in/redis.v5/internal/proto"
)
// Implements proto.MultiBulkParse
func sliceParser(rd *proto.Reader, n int64) (interface{}, error) {
vals := make([]interface{}, 0, n)
for i := int64(0); i < n; i++ {
v, err := rd.ReadReply(sliceParser)
if err == Nil {
vals = append(vals, nil)
} else if err != nil {
return nil, err
} else {
switch vv := v.(type) {
case []byte:
vals = append(vals, string(vv))
default:
vals = append(vals, v)
}
}
}
return vals, nil
}
// Implements proto.MultiBulkParse
func intSliceParser(rd *proto.Reader, n int64) (interface{}, error) {
ints := make([]int64, 0, n)
for i := int64(0); i < n; i++ {
n, err := rd.ReadIntReply()
if err != nil {
return nil, err
}
ints = append(ints, n)
}
return ints, nil
}
// Implements proto.MultiBulkParse
func boolSliceParser(rd *proto.Reader, n int64) (interface{}, error) {
bools := make([]bool, 0, n)
for i := int64(0); i < n; i++ {
n, err := rd.ReadIntReply()
if err != nil {
return nil, err
}
bools = append(bools, n == 1)
}
return bools, nil
}
// Implements proto.MultiBulkParse
func stringSliceParser(rd *proto.Reader, n int64) (interface{}, error) {
ss := make([]string, 0, n)
for i := int64(0); i < n; i++ {
s, err := rd.ReadStringReply()
if err == Nil {
ss = append(ss, "")
} else if err != nil {
return nil, err
} else {
ss = append(ss, s)
}
}
return ss, nil
}
// Implements proto.MultiBulkParse
func floatSliceParser(rd *proto.Reader, n int64) (interface{}, error) {
nn := make([]float64, 0, n)
for i := int64(0); i < n; i++ {
n, err := rd.ReadFloatReply()
if err != nil {
return nil, err
}
nn = append(nn, n)
}
return nn, nil
}
// Implements proto.MultiBulkParse
func stringStringMapParser(rd *proto.Reader, n int64) (interface{}, error) {
m := make(map[string]string, n/2)
for i := int64(0); i < n; i += 2 {
key, err := rd.ReadStringReply()
if err != nil {
return nil, err
}
value, err := rd.ReadStringReply()
if err != nil {
return nil, err
}
m[key] = value
}
return m, nil
}
// Implements proto.MultiBulkParse
func stringIntMapParser(rd *proto.Reader, n int64) (interface{}, error) {
m := make(map[string]int64, n/2)
for i := int64(0); i < n; i += 2 {
key, err := rd.ReadStringReply()
if err != nil {
return nil, err
}
n, err := rd.ReadIntReply()
if err != nil {
return nil, err
}
m[key] = n
}
return m, nil
}
// Implements proto.MultiBulkParse
func zSliceParser(rd *proto.Reader, n int64) (interface{}, error) {
zz := make([]Z, n/2)
for i := int64(0); i < n; i += 2 {
var err error
z := &zz[i/2]
z.Member, err = rd.ReadStringReply()
if err != nil {
return nil, err
}
z.Score, err = rd.ReadFloatReply()
if err != nil {
return nil, err
}
}
return zz, nil
}
// Implements proto.MultiBulkParse
func clusterSlotsParser(rd *proto.Reader, n int64) (interface{}, error) {
slots := make([]ClusterSlot, n)
for i := 0; i < len(slots); i++ {
n, err := rd.ReadArrayLen()
if err != nil {
return nil, err
}
if n < 2 {
err := fmt.Errorf("redis: got %d elements in cluster info, expected at least 2", n)
return nil, err
}
start, err := rd.ReadIntReply()
if err != nil {
return nil, err
}
end, err := rd.ReadIntReply()
if err != nil {
return nil, err
}
nodes := make([]ClusterNode, n-2)
for j := 0; j < len(nodes); j++ {
n, err := rd.ReadArrayLen()
if err != nil {
return nil, err
}
if n != 2 && n != 3 {
err := fmt.Errorf("got %d elements in cluster info address, expected 2 or 3", n)
return nil, err
}
ip, err := rd.ReadStringReply()
if err != nil {
return nil, err
}
port, err := rd.ReadIntReply()
if err != nil {
return nil, err
}
nodes[j].Addr = net.JoinHostPort(ip, strconv.FormatInt(port, 10))
if n == 3 {
id, err := rd.ReadStringReply()
if err != nil {
return nil, err
}
nodes[j].Id = id
}
}
slots[i] = ClusterSlot{
Start: int(start),
End: int(end),
Nodes: nodes,
}
}
return slots, nil
}
func newGeoLocationParser(q *GeoRadiusQuery) proto.MultiBulkParse {
return func(rd *proto.Reader, n int64) (interface{}, error) {
var loc GeoLocation
var err error
loc.Name, err = rd.ReadStringReply()
if err != nil {
return nil, err
}
if q.WithDist {
loc.Dist, err = rd.ReadFloatReply()
if err != nil {
return nil, err
}
}
if q.WithGeoHash {
loc.GeoHash, err = rd.ReadIntReply()
if err != nil {
return nil, err
}
}
if q.WithCoord {
n, err := rd.ReadArrayLen()
if err != nil {
return nil, err
}
if n != 2 {
return nil, fmt.Errorf("got %d coordinates, expected 2", n)
}
loc.Longitude, err = rd.ReadFloatReply()
if err != nil {
return nil, err
}
loc.Latitude, err = rd.ReadFloatReply()
if err != nil {
return nil, err
}
}
return &loc, nil
}
}
func newGeoLocationSliceParser(q *GeoRadiusQuery) proto.MultiBulkParse {
return func(rd *proto.Reader, n int64) (interface{}, error) {
locs := make([]GeoLocation, 0, n)
for i := int64(0); i < n; i++ {
v, err := rd.ReadReply(newGeoLocationParser(q))
if err != nil {
return nil, err
}
switch vv := v.(type) {
case []byte:
locs = append(locs, GeoLocation{
Name: string(vv),
})
case *GeoLocation:
locs = append(locs, *vv)
default:
return nil, fmt.Errorf("got %T, expected string or *GeoLocation", v)
}
}
return locs, nil
}
}
func geoPosParser(rd *proto.Reader, n int64) (interface{}, error) {
var pos GeoPos
var err error
pos.Longitude, err = rd.ReadFloatReply()
if err != nil {
return nil, err
}
pos.Latitude, err = rd.ReadFloatReply()
if err != nil {
return nil, err
}
return &pos, nil
}
func geoPosSliceParser(rd *proto.Reader, n int64) (interface{}, error) {
positions := make([]*GeoPos, 0, n)
for i := int64(0); i < n; i++ {
v, err := rd.ReadReply(geoPosParser)
if err != nil {
if err == Nil {
positions = append(positions, nil)
continue
}
return nil, err
}
switch v := v.(type) {
case *GeoPos:
positions = append(positions, v)
default:
return nil, fmt.Errorf("got %T, expected *GeoPos", v)
}
}
return positions, nil
}
func commandInfoParser(rd *proto.Reader, n int64) (interface{}, error) {
var cmd CommandInfo
var err error
if n != 6 {
return nil, fmt.Errorf("redis: got %d elements in COMMAND reply, wanted 6", n)
}
cmd.Name, err = rd.ReadStringReply()
if err != nil {
return nil, err
}
arity, err := rd.ReadIntReply()
if err != nil {
return nil, err
}
cmd.Arity = int8(arity)
flags, err := rd.ReadReply(stringSliceParser)
if err != nil {
return nil, err
}
cmd.Flags = flags.([]string)
firstKeyPos, err := rd.ReadIntReply()
if err != nil {
return nil, err
}
cmd.FirstKeyPos = int8(firstKeyPos)
lastKeyPos, err := rd.ReadIntReply()
if err != nil {
return nil, err
}
cmd.LastKeyPos = int8(lastKeyPos)
stepCount, err := rd.ReadIntReply()
if err != nil {
return nil, err
}
cmd.StepCount = int8(stepCount)
for _, flag := range cmd.Flags {
if flag == "readonly" {
cmd.ReadOnly = true
break
}
}
return &cmd, nil
}
// Implements proto.MultiBulkParse
func commandInfoSliceParser(rd *proto.Reader, n int64) (interface{}, error) {
m := make(map[string]*CommandInfo, n)
for i := int64(0); i < n; i++ {
v, err := rd.ReadReply(commandInfoParser)
if err != nil {
return nil, err
}
vv := v.(*CommandInfo)
m[vv.Name] = vv
}
return m, nil
}
// Implements proto.MultiBulkParse
func timeParser(rd *proto.Reader, n int64) (interface{}, error) {
if n != 2 {
return nil, fmt.Errorf("got %d elements, expected 2", n)
}
sec, err := rd.ReadInt()
if err != nil {
return nil, err
}
microsec, err := rd.ReadInt()
if err != nil {
return nil, err
}
return time.Unix(sec, microsec*1000), nil
}

88
go-mysql-redis/vendor/gopkg.in/redis.v5/pipeline.go generated vendored Normal file
View file

@ -0,0 +1,88 @@
package redis
import (
"errors"
"sync"
"gopkg.in/redis.v5/internal/pool"
)
type pipelineExecer func([]Cmder) error
// Pipeline implements pipelining as described in
// http://redis.io/topics/pipelining. It's safe for concurrent use
// by multiple goroutines.
type Pipeline struct {
cmdable
statefulCmdable
exec pipelineExecer
mu sync.Mutex
cmds []Cmder
closed bool
}
func (c *Pipeline) Process(cmd Cmder) error {
c.mu.Lock()
c.cmds = append(c.cmds, cmd)
c.mu.Unlock()
return nil
}
// Close closes the pipeline, releasing any open resources.
func (c *Pipeline) Close() error {
c.mu.Lock()
c.discard()
c.closed = true
c.mu.Unlock()
return nil
}
// Discard resets the pipeline and discards queued commands.
func (c *Pipeline) Discard() error {
c.mu.Lock()
err := c.discard()
c.mu.Unlock()
return err
}
func (c *Pipeline) discard() error {
if c.closed {
return pool.ErrClosed
}
c.cmds = c.cmds[:0]
return nil
}
// Exec executes all previously queued commands using one
// client-server roundtrip.
//
// Exec always returns list of commands and error of the first failed
// command if any.
func (c *Pipeline) Exec() ([]Cmder, error) {
c.mu.Lock()
defer c.mu.Unlock()
if c.closed {
return nil, pool.ErrClosed
}
if len(c.cmds) == 0 {
return nil, errors.New("redis: pipeline is empty")
}
cmds := c.cmds
c.cmds = nil
return cmds, c.exec(cmds)
}
func (c *Pipeline) pipelined(fn func(*Pipeline) error) ([]Cmder, error) {
if err := fn(c); err != nil {
return nil, err
}
cmds, err := c.Exec()
_ = c.Close()
return cmds, err
}

311
go-mysql-redis/vendor/gopkg.in/redis.v5/pubsub.go generated vendored Normal file
View file

@ -0,0 +1,311 @@
package redis
import (
"fmt"
"net"
"sync"
"time"
"gopkg.in/redis.v5/internal"
"gopkg.in/redis.v5/internal/pool"
)
// PubSub implements Pub/Sub commands as described in
// http://redis.io/topics/pubsub. It's NOT safe for concurrent use by
// multiple goroutines.
type PubSub struct {
base baseClient
cmd *Cmd
mu sync.Mutex
channels []string
patterns []string
}
func (c *PubSub) conn() (*pool.Conn, bool, error) {
cn, isNew, err := c.base.conn()
if err != nil {
return nil, false, err
}
if isNew {
c.resubscribe()
}
return cn, isNew, nil
}
func (c *PubSub) putConn(cn *pool.Conn, err error) {
c.base.putConn(cn, err, true)
}
func (c *PubSub) subscribe(redisCmd string, channels ...string) error {
args := make([]interface{}, 1+len(channels))
args[0] = redisCmd
for i, channel := range channels {
args[1+i] = channel
}
cmd := NewSliceCmd(args...)
cn, _, err := c.conn()
if err != nil {
return err
}
cn.SetWriteTimeout(c.base.opt.WriteTimeout)
err = writeCmd(cn, cmd)
c.putConn(cn, err)
return err
}
// Subscribes the client to the specified channels.
func (c *PubSub) Subscribe(channels ...string) error {
err := c.subscribe("SUBSCRIBE", channels...)
if err == nil {
c.channels = appendIfNotExists(c.channels, channels...)
}
return err
}
// Subscribes the client to the given patterns.
func (c *PubSub) PSubscribe(patterns ...string) error {
err := c.subscribe("PSUBSCRIBE", patterns...)
if err == nil {
c.patterns = appendIfNotExists(c.patterns, patterns...)
}
return err
}
// Unsubscribes the client from the given channels, or from all of
// them if none is given.
func (c *PubSub) Unsubscribe(channels ...string) error {
err := c.subscribe("UNSUBSCRIBE", channels...)
if err == nil {
c.channels = remove(c.channels, channels...)
}
return err
}
// Unsubscribes the client from the given patterns, or from all of
// them if none is given.
func (c *PubSub) PUnsubscribe(patterns ...string) error {
err := c.subscribe("PUNSUBSCRIBE", patterns...)
if err == nil {
c.patterns = remove(c.patterns, patterns...)
}
return err
}
func (c *PubSub) Close() error {
return c.base.Close()
}
func (c *PubSub) Ping(payload ...string) error {
args := []interface{}{"PING"}
if len(payload) == 1 {
args = append(args, payload[0])
}
cmd := NewCmd(args...)
cn, _, err := c.conn()
if err != nil {
return err
}
cn.SetWriteTimeout(c.base.opt.WriteTimeout)
err = writeCmd(cn, cmd)
c.putConn(cn, err)
return err
}
// Message received after a successful subscription to channel.
type Subscription struct {
// Can be "subscribe", "unsubscribe", "psubscribe" or "punsubscribe".
Kind string
// Channel name we have subscribed to.
Channel string
// Number of channels we are currently subscribed to.
Count int
}
func (m *Subscription) String() string {
return fmt.Sprintf("%s: %s", m.Kind, m.Channel)
}
// Message received as result of a PUBLISH command issued by another client.
type Message struct {
Channel string
Pattern string
Payload string
}
func (m *Message) String() string {
return fmt.Sprintf("Message<%s: %s>", m.Channel, m.Payload)
}
// Pong received as result of a PING command issued by another client.
type Pong struct {
Payload string
}
func (p *Pong) String() string {
if p.Payload != "" {
return fmt.Sprintf("Pong<%s>", p.Payload)
}
return "Pong"
}
func (c *PubSub) newMessage(reply interface{}) (interface{}, error) {
switch reply := reply.(type) {
case string:
return &Pong{
Payload: reply,
}, nil
case []interface{}:
switch kind := reply[0].(string); kind {
case "subscribe", "unsubscribe", "psubscribe", "punsubscribe":
return &Subscription{
Kind: kind,
Channel: reply[1].(string),
Count: int(reply[2].(int64)),
}, nil
case "message":
return &Message{
Channel: reply[1].(string),
Payload: reply[2].(string),
}, nil
case "pmessage":
return &Message{
Pattern: reply[1].(string),
Channel: reply[2].(string),
Payload: reply[3].(string),
}, nil
case "pong":
return &Pong{
Payload: reply[1].(string),
}, nil
default:
return nil, fmt.Errorf("redis: unsupported pubsub message: %q", kind)
}
default:
return nil, fmt.Errorf("redis: unsupported pubsub message: %#v", reply)
}
}
// ReceiveTimeout acts like Receive but returns an error if message
// is not received in time. This is low-level API and most clients
// should use ReceiveMessage.
func (c *PubSub) ReceiveTimeout(timeout time.Duration) (interface{}, error) {
if c.cmd == nil {
c.cmd = NewCmd()
}
cn, _, err := c.conn()
if err != nil {
return nil, err
}
cn.SetReadTimeout(timeout)
err = c.cmd.readReply(cn)
c.putConn(cn, err)
if err != nil {
return nil, err
}
return c.newMessage(c.cmd.Val())
}
// Receive returns a message as a Subscription, Message, Pong or error.
// See PubSub example for details. This is low-level API and most clients
// should use ReceiveMessage.
func (c *PubSub) Receive() (interface{}, error) {
return c.ReceiveTimeout(0)
}
// ReceiveMessage returns a Message or error ignoring Subscription or Pong
// messages. It automatically reconnects to Redis Server and resubscribes
// to channels in case of network errors.
func (c *PubSub) ReceiveMessage() (*Message, error) {
return c.receiveMessage(5 * time.Second)
}
func (c *PubSub) receiveMessage(timeout time.Duration) (*Message, error) {
var errNum uint
for {
msgi, err := c.ReceiveTimeout(timeout)
if err != nil {
if !internal.IsNetworkError(err) {
return nil, err
}
errNum++
if errNum < 3 {
if netErr, ok := err.(net.Error); ok && netErr.Timeout() {
err := c.Ping()
if err != nil {
internal.Logf("PubSub.Ping failed: %s", err)
}
}
} else {
// 3 consequent errors - connection is broken or
// Redis Server is down.
// Sleep to not exceed max number of open connections.
time.Sleep(time.Second)
}
continue
}
// Reset error number, because we received a message.
errNum = 0
switch msg := msgi.(type) {
case *Subscription:
// Ignore.
case *Pong:
// Ignore.
case *Message:
return msg, nil
default:
return nil, fmt.Errorf("redis: unknown message: %T", msgi)
}
}
}
func (c *PubSub) resubscribe() {
if len(c.channels) > 0 {
if err := c.Subscribe(c.channels...); err != nil {
internal.Logf("Subscribe failed: %s", err)
}
}
if len(c.patterns) > 0 {
if err := c.PSubscribe(c.patterns...); err != nil {
internal.Logf("PSubscribe failed: %s", err)
}
}
}
func remove(ss []string, es ...string) []string {
if len(es) == 0 {
return ss[:0]
}
for _, e := range es {
for i, s := range ss {
if s == e {
ss = append(ss[:i], ss[i+1:]...)
break
}
}
}
return ss
}
func appendIfNotExists(ss []string, es ...string) []string {
loop:
for _, e := range es {
for _, s := range ss {
if s == e {
continue loop
}
}
ss = append(ss, e)
}
return ss
}

378
go-mysql-redis/vendor/gopkg.in/redis.v5/redis.go generated vendored Normal file
View file

@ -0,0 +1,378 @@
package redis // import "gopkg.in/redis.v5"
import (
"fmt"
"log"
"time"
"gopkg.in/redis.v5/internal"
"gopkg.in/redis.v5/internal/pool"
"gopkg.in/redis.v5/internal/proto"
)
// Redis nil reply, .e.g. when key does not exist.
const Nil = internal.Nil
func SetLogger(logger *log.Logger) {
internal.Logger = logger
}
func (c *baseClient) String() string {
return fmt.Sprintf("Redis<%s db:%d>", c.getAddr(), c.opt.DB)
}
func (c *baseClient) conn() (*pool.Conn, bool, error) {
cn, isNew, err := c.connPool.Get()
if err != nil {
return nil, false, err
}
if !cn.Inited {
if err := c.initConn(cn); err != nil {
_ = c.connPool.Remove(cn, err)
return nil, false, err
}
}
return cn, isNew, nil
}
func (c *baseClient) putConn(cn *pool.Conn, err error, allowTimeout bool) bool {
if internal.IsBadConn(err, allowTimeout) {
_ = c.connPool.Remove(cn, err)
return false
}
_ = c.connPool.Put(cn)
return true
}
func (c *baseClient) initConn(cn *pool.Conn) error {
cn.Inited = true
if c.opt.Password == "" && c.opt.DB == 0 && !c.opt.ReadOnly {
return nil
}
// Temp client for Auth and Select.
client := newClient(c.opt, pool.NewSingleConnPool(cn))
_, err := client.Pipelined(func(pipe *Pipeline) error {
if c.opt.Password != "" {
pipe.Auth(c.opt.Password)
}
if c.opt.DB > 0 {
pipe.Select(c.opt.DB)
}
if c.opt.ReadOnly {
pipe.ReadOnly()
}
return nil
})
return err
}
func (c *baseClient) Process(cmd Cmder) error {
if c.process != nil {
return c.process(cmd)
}
return c.defaultProcess(cmd)
}
// WrapProcess replaces the process func. It takes a function createWrapper
// which is supplied by the user. createWrapper takes the old process func as
// an input and returns the new wrapper process func. createWrapper should
// use call the old process func within the new process func.
func (c *baseClient) WrapProcess(fn func(oldProcess func(cmd Cmder) error) func(cmd Cmder) error) {
c.process = fn(c.defaultProcess)
}
func (c *baseClient) defaultProcess(cmd Cmder) error {
for i := 0; i <= c.opt.MaxRetries; i++ {
cn, _, err := c.conn()
if err != nil {
cmd.setErr(err)
return err
}
cn.SetWriteTimeout(c.opt.WriteTimeout)
if err := writeCmd(cn, cmd); err != nil {
c.putConn(cn, err, false)
cmd.setErr(err)
if err != nil && internal.IsRetryableError(err) {
continue
}
return err
}
cn.SetReadTimeout(c.cmdTimeout(cmd))
err = cmd.readReply(cn)
c.putConn(cn, err, false)
if err != nil && internal.IsRetryableError(err) {
continue
}
return err
}
return cmd.Err()
}
func (c *baseClient) cmdTimeout(cmd Cmder) time.Duration {
if timeout := cmd.readTimeout(); timeout != nil {
return *timeout
} else {
return c.opt.ReadTimeout
}
}
// Close closes the client, releasing any open resources.
//
// It is rare to Close a Client, as the Client is meant to be
// long-lived and shared between many goroutines.
func (c *baseClient) Close() error {
var firstErr error
if c.onClose != nil {
if err := c.onClose(); err != nil && firstErr == nil {
firstErr = err
}
}
if err := c.connPool.Close(); err != nil && firstErr == nil {
firstErr = err
}
return firstErr
}
func (c *baseClient) getAddr() string {
return c.opt.Addr
}
type pipelineProcessor func(*pool.Conn, []Cmder) (bool, error)
func (c *baseClient) pipelineExecer(p pipelineProcessor) pipelineExecer {
return func(cmds []Cmder) error {
var firstErr error
for i := 0; i <= c.opt.MaxRetries; i++ {
cn, _, err := c.conn()
if err != nil {
setCmdsErr(cmds, err)
return err
}
canRetry, err := p(cn, cmds)
c.putConn(cn, err, false)
if err == nil {
return nil
}
if firstErr == nil {
firstErr = err
}
if !canRetry || !internal.IsRetryableError(err) {
break
}
}
return firstErr
}
}
func (c *baseClient) pipelineProcessCmds(cn *pool.Conn, cmds []Cmder) (retry bool, firstErr error) {
cn.SetWriteTimeout(c.opt.WriteTimeout)
if err := writeCmd(cn, cmds...); err != nil {
setCmdsErr(cmds, err)
return true, err
}
// Set read timeout for all commands.
cn.SetReadTimeout(c.opt.ReadTimeout)
return pipelineReadCmds(cn, cmds)
}
func pipelineReadCmds(cn *pool.Conn, cmds []Cmder) (retry bool, firstErr error) {
for i, cmd := range cmds {
err := cmd.readReply(cn)
if err == nil {
continue
}
if i == 0 {
retry = true
}
if firstErr == nil {
firstErr = err
}
}
return false, firstErr
}
func (c *baseClient) txPipelineProcessCmds(cn *pool.Conn, cmds []Cmder) (bool, error) {
cn.SetWriteTimeout(c.opt.WriteTimeout)
if err := txPipelineWriteMulti(cn, cmds); err != nil {
setCmdsErr(cmds, err)
return true, err
}
// Set read timeout for all commands.
cn.SetReadTimeout(c.opt.ReadTimeout)
if err := c.txPipelineReadQueued(cn, cmds); err != nil {
return false, err
}
_, err := pipelineReadCmds(cn, cmds)
return false, err
}
func txPipelineWriteMulti(cn *pool.Conn, cmds []Cmder) error {
multiExec := make([]Cmder, 0, len(cmds)+2)
multiExec = append(multiExec, NewStatusCmd("MULTI"))
multiExec = append(multiExec, cmds...)
multiExec = append(multiExec, NewSliceCmd("EXEC"))
return writeCmd(cn, multiExec...)
}
func (c *baseClient) txPipelineReadQueued(cn *pool.Conn, cmds []Cmder) error {
var firstErr error
// Parse queued replies.
var statusCmd StatusCmd
if err := statusCmd.readReply(cn); err != nil && firstErr == nil {
firstErr = err
}
for _, cmd := range cmds {
err := statusCmd.readReply(cn)
if err != nil {
cmd.setErr(err)
if firstErr == nil {
firstErr = err
}
}
}
// Parse number of replies.
line, err := cn.Rd.ReadLine()
if err != nil {
if err == Nil {
err = TxFailedErr
}
return err
}
switch line[0] {
case proto.ErrorReply:
return proto.ParseErrorReply(line)
case proto.ArrayReply:
// ok
default:
err := fmt.Errorf("redis: expected '*', but got line %q", line)
return err
}
return nil
}
//------------------------------------------------------------------------------
// Client is a Redis client representing a pool of zero or more
// underlying connections. It's safe for concurrent use by multiple
// goroutines.
type Client struct {
baseClient
cmdable
}
func newClient(opt *Options, pool pool.Pooler) *Client {
client := Client{
baseClient: baseClient{
opt: opt,
connPool: pool,
},
}
client.cmdable.process = client.Process
return &client
}
// NewClient returns a client to the Redis Server specified by Options.
func NewClient(opt *Options) *Client {
opt.init()
return newClient(opt, newConnPool(opt))
}
func (c *Client) copy() *Client {
c2 := new(Client)
*c2 = *c
c2.cmdable.process = c2.Process
return c2
}
// PoolStats returns connection pool stats.
func (c *Client) PoolStats() *PoolStats {
s := c.connPool.Stats()
return &PoolStats{
Requests: s.Requests,
Hits: s.Hits,
Timeouts: s.Timeouts,
TotalConns: s.TotalConns,
FreeConns: s.FreeConns,
}
}
func (c *Client) Pipelined(fn func(*Pipeline) error) ([]Cmder, error) {
return c.Pipeline().pipelined(fn)
}
func (c *Client) Pipeline() *Pipeline {
pipe := Pipeline{
exec: c.pipelineExecer(c.pipelineProcessCmds),
}
pipe.cmdable.process = pipe.Process
pipe.statefulCmdable.process = pipe.Process
return &pipe
}
func (c *Client) TxPipelined(fn func(*Pipeline) error) ([]Cmder, error) {
return c.TxPipeline().pipelined(fn)
}
// TxPipeline acts like Pipeline, but wraps queued commands with MULTI/EXEC.
func (c *Client) TxPipeline() *Pipeline {
pipe := Pipeline{
exec: c.pipelineExecer(c.txPipelineProcessCmds),
}
pipe.cmdable.process = pipe.Process
pipe.statefulCmdable.process = pipe.Process
return &pipe
}
func (c *Client) pubSub() *PubSub {
return &PubSub{
base: baseClient{
opt: c.opt,
connPool: pool.NewStickyConnPool(c.connPool.(*pool.ConnPool), false),
},
}
}
// Subscribe subscribes the client to the specified channels.
func (c *Client) Subscribe(channels ...string) (*PubSub, error) {
pubsub := c.pubSub()
if len(channels) > 0 {
if err := pubsub.Subscribe(channels...); err != nil {
pubsub.Close()
return nil, err
}
}
return pubsub, nil
}
// PSubscribe subscribes the client to the given patterns.
func (c *Client) PSubscribe(channels ...string) (*PubSub, error) {
pubsub := c.pubSub()
if len(channels) > 0 {
if err := pubsub.PSubscribe(channels...); err != nil {
pubsub.Close()
return nil, err
}
}
return pubsub, nil
}

View file

@ -0,0 +1,35 @@
// +build go1.7
package redis
import (
"context"
"gopkg.in/redis.v5/internal/pool"
)
type baseClient struct {
connPool pool.Pooler
opt *Options
process func(Cmder) error
onClose func() error // hook called when client is closed
ctx context.Context
}
func (c *Client) Context() context.Context {
if c.ctx != nil {
return c.ctx
}
return context.Background()
}
func (c *Client) WithContext(ctx context.Context) *Client {
if ctx == nil {
panic("nil context")
}
c2 := c.copy()
c2.ctx = ctx
return c2
}

View file

@ -0,0 +1,15 @@
// +build !go1.7
package redis
import (
"gopkg.in/redis.v5/internal/pool"
)
type baseClient struct {
connPool pool.Pooler
opt *Options
process func(Cmder) error
onClose func() error // hook called when client is closed
}

140
go-mysql-redis/vendor/gopkg.in/redis.v5/result.go generated vendored Normal file
View file

@ -0,0 +1,140 @@
package redis
import "time"
// NewCmdResult returns a Cmd initalised with val and err for testing
func NewCmdResult(val interface{}, err error) *Cmd {
var cmd Cmd
cmd.val = val
cmd.setErr(err)
return &cmd
}
// NewSliceResult returns a SliceCmd initalised with val and err for testing
func NewSliceResult(val []interface{}, err error) *SliceCmd {
var cmd SliceCmd
cmd.val = val
cmd.setErr(err)
return &cmd
}
// NewStatusResult returns a StatusCmd initalised with val and err for testing
func NewStatusResult(val string, err error) *StatusCmd {
var cmd StatusCmd
cmd.val = val
cmd.setErr(err)
return &cmd
}
// NewIntResult returns an IntCmd initalised with val and err for testing
func NewIntResult(val int64, err error) *IntCmd {
var cmd IntCmd
cmd.val = val
cmd.setErr(err)
return &cmd
}
// NewDurationResult returns a DurationCmd initalised with val and err for testing
func NewDurationResult(val time.Duration, err error) *DurationCmd {
var cmd DurationCmd
cmd.val = val
cmd.setErr(err)
return &cmd
}
// NewBoolResult returns a BoolCmd initalised with val and err for testing
func NewBoolResult(val bool, err error) *BoolCmd {
var cmd BoolCmd
cmd.val = val
cmd.setErr(err)
return &cmd
}
// NewStringResult returns a StringCmd initalised with val and err for testing
func NewStringResult(val string, err error) *StringCmd {
var cmd StringCmd
cmd.val = []byte(val)
cmd.setErr(err)
return &cmd
}
// NewFloatResult returns a FloatCmd initalised with val and err for testing
func NewFloatResult(val float64, err error) *FloatCmd {
var cmd FloatCmd
cmd.val = val
cmd.setErr(err)
return &cmd
}
// NewStringSliceResult returns a StringSliceCmd initalised with val and err for testing
func NewStringSliceResult(val []string, err error) *StringSliceCmd {
var cmd StringSliceCmd
cmd.val = val
cmd.setErr(err)
return &cmd
}
// NewBoolSliceResult returns a BoolSliceCmd initalised with val and err for testing
func NewBoolSliceResult(val []bool, err error) *BoolSliceCmd {
var cmd BoolSliceCmd
cmd.val = val
cmd.setErr(err)
return &cmd
}
// NewStringStringMapResult returns a StringStringMapCmd initalised with val and err for testing
func NewStringStringMapResult(val map[string]string, err error) *StringStringMapCmd {
var cmd StringStringMapCmd
cmd.val = val
cmd.setErr(err)
return &cmd
}
// NewStringIntMapCmdResult returns a StringIntMapCmd initalised with val and err for testing
func NewStringIntMapCmdResult(val map[string]int64, err error) *StringIntMapCmd {
var cmd StringIntMapCmd
cmd.val = val
cmd.setErr(err)
return &cmd
}
// NewZSliceCmdResult returns a ZSliceCmd initalised with val and err for testing
func NewZSliceCmdResult(val []Z, err error) *ZSliceCmd {
var cmd ZSliceCmd
cmd.val = val
cmd.setErr(err)
return &cmd
}
// NewScanCmdResult returns a ScanCmd initalised with val and err for testing
func NewScanCmdResult(keys []string, cursor uint64, err error) *ScanCmd {
var cmd ScanCmd
cmd.page = keys
cmd.cursor = cursor
cmd.setErr(err)
return &cmd
}
// NewClusterSlotsCmdResult returns a ClusterSlotsCmd initalised with val and err for testing
func NewClusterSlotsCmdResult(val []ClusterSlot, err error) *ClusterSlotsCmd {
var cmd ClusterSlotsCmd
cmd.val = val
cmd.setErr(err)
return &cmd
}
// NewGeoLocationCmdResult returns a GeoLocationCmd initalised with val and err for testing
func NewGeoLocationCmdResult(val []GeoLocation, err error) *GeoLocationCmd {
var cmd GeoLocationCmd
cmd.locations = val
cmd.setErr(err)
return &cmd
}
// NewCommandsInfoCmdResult returns a CommandsInfoCmd initalised with val and err for testing
func NewCommandsInfoCmdResult(val map[string]*CommandInfo, err error) *CommandsInfoCmd {
var cmd CommandsInfoCmd
cmd.val = val
cmd.setErr(err)
return &cmd
}

420
go-mysql-redis/vendor/gopkg.in/redis.v5/ring.go generated vendored Normal file
View file

@ -0,0 +1,420 @@
package redis
import (
"errors"
"fmt"
"math/rand"
"strconv"
"sync"
"sync/atomic"
"time"
"gopkg.in/redis.v5/internal"
"gopkg.in/redis.v5/internal/consistenthash"
"gopkg.in/redis.v5/internal/hashtag"
"gopkg.in/redis.v5/internal/pool"
)
var errRingShardsDown = errors.New("redis: all ring shards are down")
// RingOptions are used to configure a ring client and should be
// passed to NewRing.
type RingOptions struct {
// Map of name => host:port addresses of ring shards.
Addrs map[string]string
// Frequency of PING commands sent to check shards availability.
// Shard is considered down after 3 subsequent failed checks.
HeartbeatFrequency time.Duration
// Following options are copied from Options struct.
DB int
Password string
MaxRetries int
DialTimeout time.Duration
ReadTimeout time.Duration
WriteTimeout time.Duration
PoolSize int
PoolTimeout time.Duration
IdleTimeout time.Duration
IdleCheckFrequency time.Duration
}
func (opt *RingOptions) init() {
if opt.HeartbeatFrequency == 0 {
opt.HeartbeatFrequency = 500 * time.Millisecond
}
}
func (opt *RingOptions) clientOptions() *Options {
return &Options{
DB: opt.DB,
Password: opt.Password,
DialTimeout: opt.DialTimeout,
ReadTimeout: opt.ReadTimeout,
WriteTimeout: opt.WriteTimeout,
PoolSize: opt.PoolSize,
PoolTimeout: opt.PoolTimeout,
IdleTimeout: opt.IdleTimeout,
IdleCheckFrequency: opt.IdleCheckFrequency,
}
}
type ringShard struct {
Client *Client
down int32
}
func (shard *ringShard) String() string {
var state string
if shard.IsUp() {
state = "up"
} else {
state = "down"
}
return fmt.Sprintf("%s is %s", shard.Client, state)
}
func (shard *ringShard) IsDown() bool {
const threshold = 3
return atomic.LoadInt32(&shard.down) >= threshold
}
func (shard *ringShard) IsUp() bool {
return !shard.IsDown()
}
// Vote votes to set shard state and returns true if state was changed.
func (shard *ringShard) Vote(up bool) bool {
if up {
changed := shard.IsDown()
atomic.StoreInt32(&shard.down, 0)
return changed
}
if shard.IsDown() {
return false
}
atomic.AddInt32(&shard.down, 1)
return shard.IsDown()
}
// Ring is a Redis client that uses constistent hashing to distribute
// keys across multiple Redis servers (shards). It's safe for
// concurrent use by multiple goroutines.
//
// Ring monitors the state of each shard and removes dead shards from
// the ring. When shard comes online it is added back to the ring. This
// gives you maximum availability and partition tolerance, but no
// consistency between different shards or even clients. Each client
// uses shards that are available to the client and does not do any
// coordination when shard state is changed.
//
// Ring should be used when you need multiple Redis servers for caching
// and can tolerate losing data when one of the servers dies.
// Otherwise you should use Redis Cluster.
type Ring struct {
cmdable
opt *RingOptions
nreplicas int
mu sync.RWMutex
hash *consistenthash.Map
shards map[string]*ringShard
cmdsInfoOnce *sync.Once
cmdsInfo map[string]*CommandInfo
closed bool
}
func NewRing(opt *RingOptions) *Ring {
const nreplicas = 100
opt.init()
ring := &Ring{
opt: opt,
nreplicas: nreplicas,
hash: consistenthash.New(nreplicas, nil),
shards: make(map[string]*ringShard),
cmdsInfoOnce: new(sync.Once),
}
ring.cmdable.process = ring.Process
for name, addr := range opt.Addrs {
clopt := opt.clientOptions()
clopt.Addr = addr
ring.addClient(name, NewClient(clopt))
}
go ring.heartbeat()
return ring
}
// PoolStats returns accumulated connection pool stats.
func (c *Ring) PoolStats() *PoolStats {
var acc PoolStats
for _, shard := range c.shards {
s := shard.Client.connPool.Stats()
acc.Requests += s.Requests
acc.Hits += s.Hits
acc.Timeouts += s.Timeouts
acc.TotalConns += s.TotalConns
acc.FreeConns += s.FreeConns
}
return &acc
}
// ForEachShard concurrently calls the fn on each live shard in the ring.
// It returns the first error if any.
func (c *Ring) ForEachShard(fn func(client *Client) error) error {
var wg sync.WaitGroup
errCh := make(chan error, 1)
for _, shard := range c.shards {
if shard.IsDown() {
continue
}
wg.Add(1)
go func(shard *ringShard) {
defer wg.Done()
err := fn(shard.Client)
if err != nil {
select {
case errCh <- err:
default:
}
}
}(shard)
}
wg.Wait()
select {
case err := <-errCh:
return err
default:
return nil
}
}
func (c *Ring) cmdInfo(name string) *CommandInfo {
c.cmdsInfoOnce.Do(func() {
for _, shard := range c.shards {
cmdsInfo, err := shard.Client.Command().Result()
if err == nil {
c.cmdsInfo = cmdsInfo
return
}
}
c.cmdsInfoOnce = &sync.Once{}
})
if c.cmdsInfo == nil {
return nil
}
return c.cmdsInfo[name]
}
func (c *Ring) addClient(name string, cl *Client) {
c.mu.Lock()
c.hash.Add(name)
c.shards[name] = &ringShard{Client: cl}
c.mu.Unlock()
}
func (c *Ring) shardByKey(key string) (*ringShard, error) {
key = hashtag.Key(key)
c.mu.RLock()
if c.closed {
c.mu.RUnlock()
return nil, pool.ErrClosed
}
name := c.hash.Get(key)
if name == "" {
c.mu.RUnlock()
return nil, errRingShardsDown
}
shard := c.shards[name]
c.mu.RUnlock()
return shard, nil
}
func (c *Ring) randomShard() (*ringShard, error) {
return c.shardByKey(strconv.Itoa(rand.Int()))
}
func (c *Ring) shardByName(name string) (*ringShard, error) {
if name == "" {
return c.randomShard()
}
c.mu.RLock()
shard := c.shards[name]
c.mu.RUnlock()
return shard, nil
}
func (c *Ring) cmdShard(cmd Cmder) (*ringShard, error) {
cmdInfo := c.cmdInfo(cmd.name())
firstKey := cmd.arg(cmdFirstKeyPos(cmd, cmdInfo))
return c.shardByKey(firstKey)
}
func (c *Ring) Process(cmd Cmder) error {
shard, err := c.cmdShard(cmd)
if err != nil {
cmd.setErr(err)
return err
}
return shard.Client.Process(cmd)
}
// rebalance removes dead shards from the Ring.
func (c *Ring) rebalance() {
hash := consistenthash.New(c.nreplicas, nil)
for name, shard := range c.shards {
if shard.IsUp() {
hash.Add(name)
}
}
c.mu.Lock()
c.hash = hash
c.mu.Unlock()
}
// heartbeat monitors state of each shard in the ring.
func (c *Ring) heartbeat() {
ticker := time.NewTicker(c.opt.HeartbeatFrequency)
defer ticker.Stop()
for _ = range ticker.C {
var rebalance bool
c.mu.RLock()
if c.closed {
c.mu.RUnlock()
break
}
for _, shard := range c.shards {
err := shard.Client.Ping().Err()
if shard.Vote(err == nil || err == pool.ErrPoolTimeout) {
internal.Logf("ring shard state changed: %s", shard)
rebalance = true
}
}
c.mu.RUnlock()
if rebalance {
c.rebalance()
}
}
}
// Close closes the ring client, releasing any open resources.
//
// It is rare to Close a Ring, as the Ring is meant to be long-lived
// and shared between many goroutines.
func (c *Ring) Close() error {
c.mu.Lock()
defer c.mu.Unlock()
if c.closed {
return nil
}
c.closed = true
var firstErr error
for _, shard := range c.shards {
if err := shard.Client.Close(); err != nil && firstErr == nil {
firstErr = err
}
}
c.hash = nil
c.shards = nil
return firstErr
}
func (c *Ring) Pipeline() *Pipeline {
pipe := Pipeline{
exec: c.pipelineExec,
}
pipe.cmdable.process = pipe.Process
pipe.statefulCmdable.process = pipe.Process
return &pipe
}
func (c *Ring) Pipelined(fn func(*Pipeline) error) ([]Cmder, error) {
return c.Pipeline().pipelined(fn)
}
func (c *Ring) pipelineExec(cmds []Cmder) (firstErr error) {
cmdsMap := make(map[string][]Cmder)
for _, cmd := range cmds {
cmdInfo := c.cmdInfo(cmd.name())
name := cmd.arg(cmdFirstKeyPos(cmd, cmdInfo))
if name != "" {
name = c.hash.Get(hashtag.Key(name))
}
cmdsMap[name] = append(cmdsMap[name], cmd)
}
for i := 0; i <= c.opt.MaxRetries; i++ {
var failedCmdsMap map[string][]Cmder
for name, cmds := range cmdsMap {
shard, err := c.shardByName(name)
if err != nil {
setCmdsErr(cmds, err)
if firstErr == nil {
firstErr = err
}
continue
}
cn, _, err := shard.Client.conn()
if err != nil {
setCmdsErr(cmds, err)
if firstErr == nil {
firstErr = err
}
continue
}
canRetry, err := shard.Client.pipelineProcessCmds(cn, cmds)
shard.Client.putConn(cn, err, false)
if err == nil {
continue
}
if firstErr == nil {
firstErr = err
}
if canRetry && internal.IsRetryableError(err) {
if failedCmdsMap == nil {
failedCmdsMap = make(map[string][]Cmder)
}
failedCmdsMap[name] = cmds
}
}
if len(failedCmdsMap) == 0 {
break
}
cmdsMap = failedCmdsMap
}
return firstErr
}

56
go-mysql-redis/vendor/gopkg.in/redis.v5/script.go generated vendored Normal file
View file

@ -0,0 +1,56 @@
package redis
import (
"crypto/sha1"
"encoding/hex"
"io"
"strings"
)
type scripter interface {
Eval(script string, keys []string, args ...interface{}) *Cmd
EvalSha(sha1 string, keys []string, args ...interface{}) *Cmd
ScriptExists(scripts ...string) *BoolSliceCmd
ScriptLoad(script string) *StringCmd
}
var _ scripter = (*Client)(nil)
var _ scripter = (*Ring)(nil)
var _ scripter = (*ClusterClient)(nil)
type Script struct {
src, hash string
}
func NewScript(src string) *Script {
h := sha1.New()
io.WriteString(h, src)
return &Script{
src: src,
hash: hex.EncodeToString(h.Sum(nil)),
}
}
func (s *Script) Load(c scripter) *StringCmd {
return c.ScriptLoad(s.src)
}
func (s *Script) Exists(c scripter) *BoolSliceCmd {
return c.ScriptExists(s.src)
}
func (s *Script) Eval(c scripter, keys []string, args ...interface{}) *Cmd {
return c.Eval(s.src, keys, args...)
}
func (s *Script) EvalSha(c scripter, keys []string, args ...interface{}) *Cmd {
return c.EvalSha(s.hash, keys, args...)
}
func (s *Script) Run(c scripter, keys []string, args ...interface{}) *Cmd {
r := s.EvalSha(c, keys, args...)
if err := r.Err(); err != nil && strings.HasPrefix(err.Error(), "NOSCRIPT ") {
return s.Eval(c, keys, args...)
}
return r
}

335
go-mysql-redis/vendor/gopkg.in/redis.v5/sentinel.go generated vendored Normal file
View file

@ -0,0 +1,335 @@
package redis
import (
"errors"
"fmt"
"net"
"strings"
"sync"
"time"
"gopkg.in/redis.v5/internal"
"gopkg.in/redis.v5/internal/pool"
)
//------------------------------------------------------------------------------
// FailoverOptions are used to configure a failover client and should
// be passed to NewFailoverClient.
type FailoverOptions struct {
// The master name.
MasterName string
// A seed list of host:port addresses of sentinel nodes.
SentinelAddrs []string
// Following options are copied from Options struct.
Password string
DB int
MaxRetries int
DialTimeout time.Duration
ReadTimeout time.Duration
WriteTimeout time.Duration
PoolSize int
PoolTimeout time.Duration
IdleTimeout time.Duration
IdleCheckFrequency time.Duration
}
func (opt *FailoverOptions) options() *Options {
return &Options{
Addr: "FailoverClient",
DB: opt.DB,
Password: opt.Password,
MaxRetries: opt.MaxRetries,
DialTimeout: opt.DialTimeout,
ReadTimeout: opt.ReadTimeout,
WriteTimeout: opt.WriteTimeout,
PoolSize: opt.PoolSize,
PoolTimeout: opt.PoolTimeout,
IdleTimeout: opt.IdleTimeout,
IdleCheckFrequency: opt.IdleCheckFrequency,
}
}
// NewFailoverClient returns a Redis client that uses Redis Sentinel
// for automatic failover. It's safe for concurrent use by multiple
// goroutines.
func NewFailoverClient(failoverOpt *FailoverOptions) *Client {
opt := failoverOpt.options()
opt.init()
failover := &sentinelFailover{
masterName: failoverOpt.MasterName,
sentinelAddrs: failoverOpt.SentinelAddrs,
opt: opt,
}
client := Client{
baseClient: baseClient{
opt: opt,
connPool: failover.Pool(),
onClose: func() error {
return failover.Close()
},
},
}
client.cmdable.process = client.Process
return &client
}
//------------------------------------------------------------------------------
type sentinelClient struct {
cmdable
baseClient
}
func newSentinel(opt *Options) *sentinelClient {
opt.init()
client := sentinelClient{
baseClient: baseClient{
opt: opt,
connPool: newConnPool(opt),
},
}
client.cmdable = cmdable{client.Process}
return &client
}
func (c *sentinelClient) PubSub() *PubSub {
return &PubSub{
base: baseClient{
opt: c.opt,
connPool: pool.NewStickyConnPool(c.connPool.(*pool.ConnPool), false),
},
}
}
func (c *sentinelClient) GetMasterAddrByName(name string) *StringSliceCmd {
cmd := NewStringSliceCmd("SENTINEL", "get-master-addr-by-name", name)
c.Process(cmd)
return cmd
}
func (c *sentinelClient) Sentinels(name string) *SliceCmd {
cmd := NewSliceCmd("SENTINEL", "sentinels", name)
c.Process(cmd)
return cmd
}
type sentinelFailover struct {
masterName string
sentinelAddrs []string
opt *Options
pool *pool.ConnPool
poolOnce sync.Once
mu sync.RWMutex
sentinel *sentinelClient
}
func (d *sentinelFailover) Close() error {
return d.resetSentinel()
}
func (d *sentinelFailover) dial() (net.Conn, error) {
addr, err := d.MasterAddr()
if err != nil {
return nil, err
}
return net.DialTimeout("tcp", addr, d.opt.DialTimeout)
}
func (d *sentinelFailover) Pool() *pool.ConnPool {
d.poolOnce.Do(func() {
d.opt.Dialer = d.dial
d.pool = newConnPool(d.opt)
})
return d.pool
}
func (d *sentinelFailover) MasterAddr() (string, error) {
d.mu.Lock()
defer d.mu.Unlock()
// Try last working sentinel.
if d.sentinel != nil {
addr, err := d.sentinel.GetMasterAddrByName(d.masterName).Result()
if err != nil {
internal.Logf("sentinel: GetMasterAddrByName %q failed: %s", d.masterName, err)
d._resetSentinel()
} else {
addr := net.JoinHostPort(addr[0], addr[1])
internal.Logf("sentinel: %q addr is %s", d.masterName, addr)
return addr, nil
}
}
for i, sentinelAddr := range d.sentinelAddrs {
sentinel := newSentinel(&Options{
Addr: sentinelAddr,
DialTimeout: d.opt.DialTimeout,
ReadTimeout: d.opt.ReadTimeout,
WriteTimeout: d.opt.WriteTimeout,
PoolSize: d.opt.PoolSize,
PoolTimeout: d.opt.PoolTimeout,
IdleTimeout: d.opt.IdleTimeout,
})
masterAddr, err := sentinel.GetMasterAddrByName(d.masterName).Result()
if err != nil {
internal.Logf("sentinel: GetMasterAddrByName %q failed: %s", d.masterName, err)
sentinel.Close()
continue
}
// Push working sentinel to the top.
d.sentinelAddrs[0], d.sentinelAddrs[i] = d.sentinelAddrs[i], d.sentinelAddrs[0]
d.setSentinel(sentinel)
addr := net.JoinHostPort(masterAddr[0], masterAddr[1])
internal.Logf("sentinel: %q addr is %s", d.masterName, addr)
return addr, nil
}
return "", errors.New("redis: all sentinels are unreachable")
}
func (d *sentinelFailover) setSentinel(sentinel *sentinelClient) {
d.discoverSentinels(sentinel)
d.sentinel = sentinel
go d.listen(sentinel)
}
func (d *sentinelFailover) resetSentinel() error {
d.mu.Lock()
err := d._resetSentinel()
d.mu.Unlock()
return err
}
func (d *sentinelFailover) _resetSentinel() error {
var err error
if d.sentinel != nil {
err = d.sentinel.Close()
d.sentinel = nil
}
return err
}
func (d *sentinelFailover) discoverSentinels(sentinel *sentinelClient) {
sentinels, err := sentinel.Sentinels(d.masterName).Result()
if err != nil {
internal.Logf("sentinel: Sentinels %q failed: %s", d.masterName, err)
return
}
for _, sentinel := range sentinels {
vals := sentinel.([]interface{})
for i := 0; i < len(vals); i += 2 {
key := vals[i].(string)
if key == "name" {
sentinelAddr := vals[i+1].(string)
if !contains(d.sentinelAddrs, sentinelAddr) {
internal.Logf(
"sentinel: discovered new %q sentinel: %s",
d.masterName, sentinelAddr,
)
d.sentinelAddrs = append(d.sentinelAddrs, sentinelAddr)
}
}
}
}
}
// closeOldConns closes connections to the old master after failover switch.
func (d *sentinelFailover) closeOldConns(newMaster string) {
// Good connections that should be put back to the pool. They
// can't be put immediately, because pool.PopFree will return them
// again on next iteration.
cnsToPut := make([]*pool.Conn, 0)
for {
cn := d.pool.PopFree()
if cn == nil {
break
}
if cn.RemoteAddr().String() != newMaster {
err := fmt.Errorf(
"sentinel: closing connection to the old master %s",
cn.RemoteAddr(),
)
internal.Logf(err.Error())
d.pool.Remove(cn, err)
} else {
cnsToPut = append(cnsToPut, cn)
}
}
for _, cn := range cnsToPut {
d.pool.Put(cn)
}
}
func (d *sentinelFailover) listen(sentinel *sentinelClient) {
var pubsub *PubSub
for {
if pubsub == nil {
pubsub = sentinel.PubSub()
if err := pubsub.Subscribe("+switch-master"); err != nil {
internal.Logf("sentinel: Subscribe failed: %s", err)
pubsub.Close()
d.resetSentinel()
return
}
}
msg, err := pubsub.ReceiveMessage()
if err != nil {
internal.Logf("sentinel: ReceiveMessage failed: %s", err)
pubsub.Close()
d.resetSentinel()
return
}
switch msg.Channel {
case "+switch-master":
parts := strings.Split(msg.Payload, " ")
if parts[0] != d.masterName {
internal.Logf("sentinel: ignore new %s addr", parts[0])
continue
}
addr := net.JoinHostPort(parts[3], parts[4])
internal.Logf(
"sentinel: new %q addr is %s",
d.masterName, addr,
)
d.closeOldConns(addr)
}
}
}
func contains(slice []string, str string) bool {
for _, s := range slice {
if s == str {
return true
}
}
return false
}

99
go-mysql-redis/vendor/gopkg.in/redis.v5/tx.go generated vendored Normal file
View file

@ -0,0 +1,99 @@
package redis
import (
"gopkg.in/redis.v5/internal"
"gopkg.in/redis.v5/internal/pool"
)
// Redis transaction failed.
const TxFailedErr = internal.RedisError("redis: transaction failed")
// Tx implements Redis transactions as described in
// http://redis.io/topics/transactions. It's NOT safe for concurrent use
// by multiple goroutines, because Exec resets list of watched keys.
// If you don't need WATCH it is better to use Pipeline.
type Tx struct {
cmdable
statefulCmdable
baseClient
}
func (c *Client) newTx() *Tx {
tx := Tx{
baseClient: baseClient{
opt: c.opt,
connPool: pool.NewStickyConnPool(c.connPool.(*pool.ConnPool), true),
},
}
tx.cmdable.process = tx.Process
tx.statefulCmdable.process = tx.Process
return &tx
}
func (c *Client) Watch(fn func(*Tx) error, keys ...string) error {
tx := c.newTx()
if len(keys) > 0 {
if err := tx.Watch(keys...).Err(); err != nil {
_ = tx.Close()
return err
}
}
firstErr := fn(tx)
if err := tx.Close(); err != nil && firstErr == nil {
firstErr = err
}
return firstErr
}
// close closes the transaction, releasing any open resources.
func (c *Tx) Close() error {
_ = c.Unwatch().Err()
return c.baseClient.Close()
}
// Watch marks the keys to be watched for conditional execution
// of a transaction.
func (c *Tx) Watch(keys ...string) *StatusCmd {
args := make([]interface{}, 1+len(keys))
args[0] = "WATCH"
for i, key := range keys {
args[1+i] = key
}
cmd := NewStatusCmd(args...)
c.Process(cmd)
return cmd
}
// Unwatch flushes all the previously watched keys for a transaction.
func (c *Tx) Unwatch(keys ...string) *StatusCmd {
args := make([]interface{}, 1+len(keys))
args[0] = "UNWATCH"
for i, key := range keys {
args[1+i] = key
}
cmd := NewStatusCmd(args...)
c.Process(cmd)
return cmd
}
func (c *Tx) Pipeline() *Pipeline {
pipe := Pipeline{
exec: c.pipelineExecer(c.txPipelineProcessCmds),
}
pipe.cmdable.process = pipe.Process
pipe.statefulCmdable.process = pipe.Process
return &pipe
}
// Pipelined executes commands queued in the fn in a transaction
// and restores the connection state to normal.
//
// When using WATCH, EXEC will execute commands only if the watched keys
// were not modified, allowing for a check-and-set mechanism.
//
// Exec always returns list of commands. If transaction fails
// TxFailedErr is returned. Otherwise Exec returns error of the first
// failed command or nil.
func (c *Tx) Pipelined(fn func(*Pipeline) error) ([]Cmder, error) {
return c.Pipeline().pipelined(fn)
}