...
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package logger
16
17 import (
18 "fmt"
19 "io"
20 "sync"
21 )
22
23
24 type StreamHandler func(writer io.Writer, record *Record, formatter *Formatter) error
25
26
27 type Opener interface {
28 Open() (io.WriteCloser, error)
29 }
30
31
32
33 type Stream struct {
34 writer io.Writer
35 closer io.Closer
36 formatter *Formatter
37 mutex sync.RWMutex
38 opener Opener
39 minimumLevel int
40 maximumLevel int
41 reopen bool
42 isDisabled bool
43 handler StreamHandler
44 }
45
46
47 func NewStream() *Stream {
48 return &Stream{
49 formatter: NewFormatter(),
50 minimumLevel: MinimumLevel,
51 maximumLevel: MaximumLevel,
52 handler: StreamHandlerDefault,
53 }
54 }
55
56
57 func (s *Stream) Lock() {
58 s.mutex.Lock()
59 }
60
61
62 func (s *Stream) Unlock() {
63 s.mutex.Unlock()
64 }
65
66
67 func (s *Stream) RLock() {
68 s.mutex.RLock()
69 }
70
71
72 func (s *Stream) RUnlock() {
73 s.mutex.RUnlock()
74 }
75
76
77 func (s *Stream) SetStreamHandler(handler StreamHandler) *Stream {
78 s.mutex.Lock()
79 defer s.mutex.Unlock()
80
81 if handler != nil {
82 s.handler = handler
83 }
84
85 return s
86 }
87
88
89 func (s *Stream) SetWriter(writer io.Writer) error {
90 s.mutex.Lock()
91 defer s.mutex.Unlock()
92
93 if s.writer == writer {
94 return nil
95 }
96
97 if s.closer != nil {
98 err := s.closer.Close()
99
100 if err != nil {
101 return NewRuntimeError("cannot close stream", err)
102 }
103 }
104
105 s.writer = writer
106 s.closer = nil
107
108 return nil
109 }
110
111
112 func (s *Stream) SetWriteCloser(writeCloser io.WriteCloser) error {
113 s.mutex.Lock()
114 defer s.mutex.Unlock()
115
116 if (s.writer == writeCloser) && (s.closer == writeCloser) {
117 return nil
118 }
119
120 if s.closer != nil {
121 err := s.closer.Close()
122
123 if err != nil {
124 return NewRuntimeError("cannot close stream", err)
125 }
126 }
127
128 s.writer = writeCloser
129 s.closer = writeCloser
130
131 return nil
132 }
133
134
135 func (s *Stream) SetOpener(opener Opener) *Stream {
136 s.mutex.Lock()
137 defer s.mutex.Unlock()
138
139 s.opener = opener
140
141 return s
142 }
143
144
145 func (s *Stream) Reopen() *Stream {
146 s.reopen = true
147
148 return s
149 }
150
151
152 func (s *Stream) Enable() Handler {
153 s.mutex.Lock()
154 defer s.mutex.Unlock()
155
156 s.isDisabled = false
157
158 return s
159 }
160
161
162 func (s *Stream) Disable() Handler {
163 s.mutex.Lock()
164 defer s.mutex.Unlock()
165
166 s.isDisabled = true
167
168 return s
169 }
170
171
172 func (s *Stream) IsEnabled() bool {
173 s.mutex.RLock()
174 defer s.mutex.RUnlock()
175
176 return !s.isDisabled
177 }
178
179
180 func (s *Stream) SetFormatter(formatter *Formatter) Handler {
181 s.mutex.Lock()
182 defer s.mutex.Unlock()
183
184 s.formatter = formatter
185
186 return s
187 }
188
189
190 func (s *Stream) GetFormatter() *Formatter {
191 s.mutex.RLock()
192 defer s.mutex.RUnlock()
193
194 return s.formatter
195 }
196
197
198 func (s *Stream) SetLevel(level int) Handler {
199 s.mutex.Lock()
200 defer s.mutex.Unlock()
201
202 s.minimumLevel = level
203 s.maximumLevel = level
204
205 return s
206 }
207
208
209 func (s *Stream) SetMinimumLevel(level int) Handler {
210 s.mutex.Lock()
211 defer s.mutex.Unlock()
212
213 s.minimumLevel = level
214
215 return s
216 }
217
218
219 func (s *Stream) GetMinimumLevel() int {
220 s.mutex.RLock()
221 defer s.mutex.RUnlock()
222
223 return s.minimumLevel
224 }
225
226
227 func (s *Stream) SetMaximumLevel(level int) Handler {
228 s.mutex.Lock()
229 defer s.mutex.Unlock()
230
231 s.maximumLevel = level
232
233 return s
234 }
235
236
237 func (s *Stream) GetMaximumLevel() int {
238 s.mutex.RLock()
239 defer s.mutex.RUnlock()
240
241 return s.maximumLevel
242 }
243
244
245 func (s *Stream) SetLevelRange(min, max int) Handler {
246 s.mutex.Lock()
247 defer s.mutex.Unlock()
248
249 s.minimumLevel = min
250 s.maximumLevel = max
251
252 return s
253 }
254
255
256 func (s *Stream) GetLevelRange() (min, max int) {
257 s.mutex.RLock()
258 defer s.mutex.RUnlock()
259
260 return s.minimumLevel, s.maximumLevel
261 }
262
263
264 func (s *Stream) Emit(record *Record) error {
265 s.mutex.Lock()
266 defer s.mutex.Unlock()
267
268 if s.reopen {
269 if s.closer != nil {
270 err := s.closer.Close()
271
272 if err != nil {
273 return NewRuntimeError("cannot close stream", err)
274 }
275
276 s.writer = nil
277 s.closer = nil
278 }
279
280 s.reopen = false
281 }
282
283 if (s.writer == nil) && (s.closer == nil) && (s.opener != nil) {
284 writer, err := s.opener.Open()
285
286 if err != nil {
287 return NewRuntimeError("cannot open stream", err)
288 }
289
290 s.writer = writer
291 s.closer = writer
292 }
293
294 if s.writer != nil {
295 if err := s.handler(s.writer, record, s.formatter); err != nil {
296 return NewRuntimeError("cannot write to stream", err)
297 }
298 }
299
300 return nil
301 }
302
303
304 func (s *Stream) Close() error {
305 s.mutex.Lock()
306 defer s.mutex.Unlock()
307
308 if s.closer != nil {
309 err := s.closer.Close()
310
311 if err != nil {
312 return NewRuntimeError("cannot close stream", err)
313 }
314
315 s.writer = nil
316 s.closer = nil
317 }
318
319 return nil
320 }
321
322
323 func StreamHandlerDefault(writer io.Writer, record *Record, formatter *Formatter) error {
324 message, err := formatter.Format(record)
325
326 if err != nil {
327 return NewRuntimeError("cannot format record", err)
328 }
329
330 if _, err := fmt.Fprintln(writer, message); err != nil {
331 return NewRuntimeError("cannot write to stream", err)
332 }
333
334 return nil
335 }
336
337
338 func StreamHandlerNDJSON(writer io.Writer, record *Record, _ *Formatter) error {
339 bytes, err := record.ToJSON()
340
341 if err != nil {
342 return NewRuntimeError("cannot format record", err)
343 }
344
345 if _, err := fmt.Fprintln(writer, string(bytes)); err != nil {
346 return NewRuntimeError("cannot write to stream", err)
347 }
348
349 return nil
350 }
351
View as plain text