...

Source file src/gitlab.com/tymonx/go-logger/logger/worker.go

Documentation: gitlab.com/tymonx/go-logger/logger

     1  // Copyright 2020 Tymoteusz Blazejczyk
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  //     http://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    12  // See the License for the specific language governing permissions and
    13  // limitations under the License.
    14  
    15  package logger
    16  
    17  import (
    18  	"os"
    19  	"path/filepath"
    20  	"sync"
    21  	"time"
    22  )
    23  
    24  // These constants define default values for Worker.
    25  const (
    26  	DefaultQueueLength = 4096
    27  )
    28  
    29  // A Worker represents an active logger worker thread. It handles formatting
    30  // received log messages and I/O operations.
    31  type Worker struct {
    32  	flush   chan *sync.WaitGroup
    33  	records chan *Record
    34  	mutex   sync.RWMutex
    35  }
    36  
    37  var gWorkerOnce sync.Once   // nolint:gochecknoglobals
    38  var gWorkerInstance *Worker // nolint:gochecknoglobals
    39  
    40  // NewWorker creates a new Worker object.
    41  func NewWorker() *Worker {
    42  	worker := &Worker{
    43  		flush:   make(chan *sync.WaitGroup, 1),
    44  		records: make(chan *Record, DefaultQueueLength),
    45  	}
    46  
    47  	go worker.run()
    48  
    49  	return worker
    50  }
    51  
    52  // GetWorker returns logger worker instance. First call to it creates and
    53  // starts logger worker thread.
    54  func GetWorker() *Worker {
    55  	gWorkerOnce.Do(func() {
    56  		gWorkerInstance = NewWorker()
    57  	})
    58  
    59  	return gWorkerInstance
    60  }
    61  
    62  // SetQueueLength sets logger worker thread queue length for log messages.
    63  func (w *Worker) SetQueueLength(length int) *Worker {
    64  	w.mutex.Lock()
    65  	defer w.mutex.Unlock()
    66  
    67  	if length <= 0 {
    68  		length = DefaultQueueLength
    69  	}
    70  
    71  	if cap(w.records) != length {
    72  		w.records = make(chan *Record, length)
    73  	}
    74  
    75  	return w
    76  }
    77  
    78  // Flush flushes all log messages.
    79  func (w *Worker) Flush() *Worker {
    80  	flush := new(sync.WaitGroup)
    81  
    82  	flush.Add(1)
    83  	w.flush <- flush
    84  	flush.Wait()
    85  
    86  	return w
    87  }
    88  
    89  // Run processes all incoming log messages from loggers. It emits received log
    90  // records to all added log handlers for specific logger.
    91  func (w *Worker) run() {
    92  	for {
    93  		select {
    94  		case flush := <-w.flush:
    95  			for records := len(w.records); records > 0; records-- {
    96  				record := <-w.records
    97  
    98  				if record != nil {
    99  					w.emit(record.logger, record)
   100  				}
   101  			}
   102  
   103  			if flush != nil {
   104  				flush.Done()
   105  			}
   106  		case record := <-w.records:
   107  			if record != nil {
   108  				w.emit(record.logger, record)
   109  			}
   110  		}
   111  	}
   112  }
   113  
   114  // emit prepares provided log record and it dispatches to all added log
   115  // handlers for further formatting and specific I/O implementation operations.
   116  func (*Worker) emit(logger *Logger, record *Record) {
   117  	var err error
   118  
   119  	record.Type = DefaultTypeName
   120  	record.File.Name = filepath.Base(record.File.Path)
   121  	record.File.Function = filepath.Base(record.File.Function)
   122  	record.Timestamp.Created = record.Time.Format(time.RFC3339)
   123  
   124  	record.Address, err = getAddress()
   125  
   126  	if err != nil {
   127  		printError(NewRuntimeError("cannot get local IP address", err))
   128  	}
   129  
   130  	record.Hostname, err = getHostname()
   131  
   132  	if err != nil {
   133  		printError(NewRuntimeError("cannot get local hostname", err))
   134  	}
   135  
   136  	logger.mutex.RLock()
   137  	defer logger.mutex.RUnlock()
   138  
   139  	record.Name = logger.name
   140  	record.ID, err = logger.idGenerator.Generate()
   141  
   142  	if err != nil {
   143  		printError(NewRuntimeError("cannot generate ID", err))
   144  	}
   145  
   146  	if record.Name == "" {
   147  		record.Name = filepath.Base(os.Args[0])
   148  	}
   149  
   150  	for _, handler := range logger.handlers {
   151  		min, max := handler.GetLevelRange()
   152  
   153  		if handler.IsEnabled() && (record.Level.Value >= min) && (record.Level.Value <= max) {
   154  			err = handler.Emit(record)
   155  
   156  			if err != nil {
   157  				printError(NewRuntimeError("cannot emit record", err))
   158  			}
   159  		}
   160  	}
   161  }
   162  

View as plain text