前面我们学习了并发以及解决并发的常用方式;今天我们看看Go语言实现的三种常用并发模式;这些模式可以在实际生产应用中合理使用,免去了我们造轮子的过程。
我们说过通道的性能并不是太好,这个在大批量循环处理的时候的确有些问题。性能不是关键诉求的场景下还是可以的,他能简化编程。这里讲的三种并发模式,都需要结合使用通道和协程。
Runner
下面给出完整的Runner代码:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
|
package runner
import (
"errors"
"log"
"os"
"os/signal"
"time"
)
// 给定时间内执行一组任务
// 在操作系统发送中断信号时结束这组任务
type Runner struct {
interrupt chan os.Signal
complete chan error
timeout <-chan time.Time
tasks []func(int)
}
var ErrTimeout = errors.New("received timeout")
var ErrInterrupt = errors.New("received interrupt")
func New(d time.Duration) *Runner {
return &Runner{
interrupt: make(chan os.Signal, 1),
complete: make(chan error),
timeout: time.After(d),
}
}
func (r *Runner) Add(tasks ...func(int)) {
r.tasks = append(r.tasks, tasks...)
}
func (r *Runner) Start() error {
signal.Notify(r.interrupt, os.Interrupt)
go func() {
r.complete <- r.run()
}()
select {
case err := <-r.complete:
return err
case <-r.timeout:
return ErrTimeout
}
}
func (r *Runner) run() error {
for id, task := range r.tasks {
if r.gotInterrupt() {
return ErrInterrupt
}
task(id)
}
return nil
}
func (r *Runner) gotInterrupt() bool {
select {
case <-r.interrupt:
signal.Stop(r.interrupt)
return true
default:
return false
}
}
|
我们再写一个测试的例子看看,上面的Runner工具包的效果:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
|
const timeout = 3 * time.Second
func main() {
log.Println("Starting work.")
r := New(timeout)
r.Add(createTask(), createTask(), createTask())
if err := r.Start(); err != nil {
switch err {
case ErrTimeout:
log.Println("Terminating due to timeout.")
os.Exit(1)
case ErrInterrupt:
log.Println("Terminating due to interrupt.")
}
}
log.Println("Process ended.")
}
func createTask() func(int) {
return func(id int) {
log.Printf("Processor -> Task #%d.", id)
time.Sleep(time.Duration(id) * time.Second)
}
}
|
试一试看看结果:
2021/01/13 00:30:15 Starting work.
2021/01/13 00:30:15 Processor -> Task #0.
2021/01/13 00:30:15 Processor -> Task #1.
2021/01/13 00:30:16 Processor -> Task #2.
2021/01/13 00:30:18 Terminating due to timeout.
这个Runner模式还是很有代表性的,他能把(任务队列,超时,系统中断信号)等结合起来形成一项定时任务。任何一个条件满足触发,程序就结束了。
Pool
缓存池的概念随处可见。这里模拟一个缓存池:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
|
package pool
import (
"errors"
"io"
"log"
"math/rand"
"sync"
"sync/atomic"
"time"
)
type Pool struct {
m sync.Mutex
resources chan io.Closer
factory func() (io.Closer, error)
closed bool
}
var ErrPoolClosed = errors.New("Pool has been closed.")
func New(fn func() (io.Closer, error), size uint) (*Pool, error) {
if size <= 0 {
return nil, errors.New("Size value tool small.")
}
return &Pool{
factory: fn,
resources: make(chan io.Closer, size),
}, nil
}
func (p *Pool) Acquire() (io.Closer, error) {
select {
case r, ok := <-p.resources:
log.Println("Acquire: ", "Shared Resource")
if !ok {
return nil, ErrPoolClosed
}
return r, nil
default:
log.Println("Acquire: ", "New Resource")
return p.factory()
}
}
func (p *Pool) Release(r io.Closer) {
p.m.Lock()
defer p.m.Unlock()
if p.closed {
r.Close()
return
}
select {
case p.resources <- r:
log.Println("Release:", "In Queue")
default:
log.Println("Release: ", "Closing")
r.Close()
}
}
func (p *Pool) Close() {
p.m.Lock()
defer p.m.Unlock()
if p.closed {
return
}
p.closed = true
close(p.resources)
for r := range p.resources {
r.Close()
}
}
|
执行测试:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
|
const (
maxGoroutines = 5
pooledResources = 2
)
type dbConnection struct {
ID int32
}
func (dbConn *dbConnection) Close() error {
log.Println("Close: Connection", dbConn.ID)
return nil
}
var idCounter int32
func createConnection() (io.Closer, error) {
id := atomic.AddInt32(&idCounter, 1)
log.Println("Create: New Connection ", id)
return &dbConnection{id}, nil
}
func TryPool() {
var wg sync.WaitGroup
wg.Add(maxGoroutines)
p, err := New(createConnection, pooledResources)
if err != nil {
log.Println(err)
}
for query := 0; query < maxGoroutines; query++ {
go func(q int) {
performQueries(q, p)
wg.Done()
}(query)
}
wg.Wait()
log.Println("Shutdown Program.")
}
func performQueries(query int, p *Pool) {
conn, err := p.Acquire()
if err != nil {
log.Println(err)
return
}
defer p.Release(conn)
time.Sleep(time.Duration(rand.Intn(1000)) * time.Millisecond)
log.Printf("QID[%d] CID[%d]\n\n", query, conn.(*dbConnection).ID)
}
|
大家可以自行试一下结果。
Work
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
|
package work
import (
"log"
"sync"
"time"
)
type Worker interface {
Task()
}
type Pool struct {
work chan Worker
wg sync.WaitGroup
}
func New(maxGoroutines int) *Pool {
p := Pool{
work: make(chan Worker),
}
p.wg.Add(maxGoroutines)
for i := 0; i < maxGoroutines; i++ {
go func() {
for w := range p.work {
w.Task()
}
p.wg.Done()
}()
}
return &p
}
func (p *Pool) Run(w Worker) {
p.work <- w
}
func (p *Pool) Shutdown() {
close(p.work)
p.wg.Wait()
}
|
测试程序:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
|
var names = []string{
"steve",
"bob",
"mary",
"therese",
"jason",
}
type namePrinter struct {
name string
}
func (m *namePrinter) Task() {
log.Println(m.name)
time.Sleep(time.Second)
}
func TryWork() {
p := New(2)
var wg sync.WaitGroup
wg.Add(100 * len(names))
for i := 0; i < 100; i++ {
for _, name := range names {
np := namePrinter{
name: name,
}
go func() {
p.Run(&np)
wg.Done()
}()
}
}
wg.Wait()
p.Shutdown()
}
|
以上三种示例具体讲解可以参阅《Go语言实战》。
(完)