-
Notifications
You must be signed in to change notification settings - Fork 6
/
rawJob.go
147 lines (125 loc) · 3.82 KB
/
rawJob.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
package beanstalkworker
import "time"
import "github.com/beanstalkd/go-beanstalk"
import "fmt"
// Actions the user can choose in case of an unmarshal error.
const (
ActionDeleteJob = "delete"
ActionBuryJob = "bury"
ActionReleaseJob = "release"
)
// RawJob represents the raw job data that is returned by beanstalkd.
type RawJob struct {
id uint64
err error
body *[]byte
conn *beanstalk.Conn
tube string
prio uint32
releases uint32
reserves uint32
timeouts uint32
delay time.Duration
age time.Duration
returnPrio uint32
returnDelay time.Duration
log *Logger
}
// NewEmptyJob initialises a new empty RawJob with a custom logger.
// Useful for testing methods that log messages on the job.
func NewEmptyJob(cl CustomLogger) *RawJob {
logger := &Logger{
Info: cl.Info,
Infof: cl.Infof,
Error: cl.Error,
Errorf: cl.Errorf,
}
return &RawJob{
log: logger,
}
}
// Delete function deletes the job from the queue.
func (job *RawJob) Delete() {
if err := job.conn.Delete(job.id); err != nil {
job.log.Error("Could not delete job: " + err.Error())
}
}
// Touch function touches the job from the queue.
func (job *RawJob) Touch() {
if err := job.conn.Touch(job.id); err != nil {
job.log.Error("Could not touch job: " + err.Error())
}
}
// Release function releases the job from the queue.
func (job *RawJob) Release() {
if err := job.conn.Release(job.id, job.returnPrio, job.returnDelay); err != nil {
job.log.Error("Could not release job: " + err.Error())
}
}
// Bury function buries the job from the queue.
func (job *RawJob) Bury() {
if err := job.conn.Bury(job.id, job.returnPrio); err != nil {
job.log.Error("Could not bury job: " + err.Error())
}
}
// SetReturnPriority sets the return priority to use if a job is released or buried.
func (job *RawJob) SetReturnPriority(prio uint32) {
job.returnPrio = prio
}
// SetReturnDelay sets the return delay to use if a job is released back to queue.
func (job *RawJob) SetReturnDelay(delay time.Duration) {
job.returnDelay = delay
}
// GetAge gets the age of the job from the job stats.
func (job *RawJob) GetAge() time.Duration {
return job.age
}
// GetDelay gets the delay of the job from the job stats.
func (job *RawJob) GetDelay() time.Duration {
return job.delay
}
// GetPriority gets the priority of the job.
func (job *RawJob) GetPriority() uint32 {
return job.prio
}
// GetReleases gets the count of release of the job.
func (job *RawJob) GetReleases() uint32 {
return job.releases
}
// GetReserves gets the count of reserves of the job.
func (job *RawJob) GetReserves() uint32 {
return job.reserves
}
// GetTimeouts gets the count of timeouts of the job.
func (job *RawJob) GetTimeouts() uint32 {
return job.timeouts
}
// GetTube returns the tube name we got this job from.
func (job *RawJob) GetTube() string {
return job.tube
}
// GetConn returns the beanstalk connection used to receive the job.
func (job *RawJob) GetConn() *beanstalk.Conn {
return job.conn
}
// LogError function logs an error message regarding the job.
func (job *RawJob) LogError(a ...interface{}) {
job.log.Error("Tube: ", job.tube, ", Job: ", job.id, ": Error: ", fmt.Sprint(a...))
}
// LogInfo function logs an info message regarding the job.
func (job *RawJob) LogInfo(a ...interface{}) {
job.log.Info("Tube: ", job.tube, ", Job: ", job.id, ": ", fmt.Sprint(a...))
}
// unmarshalErrorAction handles unmarshal error, depending on the user choice.
func (job *RawJob) unmarshalErrorAction(unmarshalErrorAction string) {
switch unmarshalErrorAction {
case ActionDeleteJob:
job.Delete()
case ActionBuryJob:
job.Bury()
default:
// Release as the default option as this would be the safest option for the user.
// We don't want someone to have some jobs being deleted if they are not aware of it.
job.Release()
}
}