使用Golang,25秒读取16GB文件


当今世界的任何计算机系统每天都会生成大量的日志或数据。随着系统的发展,将调试数据存储到数据库中是不可行的,因为它们是不可变的,并且只能用于分析和解决故障。所以大部分公司倾向于将日志存储在文件中,而这些文件通常位于本地磁盘中。

我们将使用Go语言,从一个大小为16GB的.txt.log文件中提取日志。

让我们开始编码……

首先,我们打开文件。对于任何文件的IO,我们都将使用标准的Go os.File。
f, err := os.Open(fileName)
if err != nil {
fmt.Println("cannot able to read the file", err)
return
}
// UPDATE: close after checking error
defer file.Close()  //Do not forget to close the file

打开文件后,我们有以下两个选项可以选择:
  1. 逐行读取文件,这有助于减少内存紧张,但需要更多的时间。
  2. 一次将整个文件读入内存并处理该文件,这将消耗更多内存,但会显著减少时间。


由于文件太大,即16 GB,因此无法将整个文件加载到内存中。但是第一种选择对我们来说也是不可行的,因为我们希望在几秒钟内处理文件。

但你猜怎么着,还有第三种选择。瞧……相比于将整个文件加载到内存中,在Go语言中,我们还可以使用bufio.NewReader()将文件分块加载。
r := bufio.NewReader(f)
for {
buf := make([]byte,4*1024) //the chunk size
n, err := r.Read(buf) //loading chunk into buffer
buf = buf[:n]
if n == 0 {

 if err != nil {
   fmt.Println(err)
   break
 }
 if err == io.EOF {
   break
 }
 return err
}


一旦我们将文件分块,我们就可以分叉一个线程,即Go routine,同时处理多个文件区块。上述代码将修改为:
//sync pools to reuse the memory and decrease the preassure on Garbage Collector
linesPool := sync.Pool{New: func() interface{} {
    lines := make([]byte, 500*1024)
    return lines
}}
stringPool := sync.Pool{New: func() interface{} {
      lines := ""
      return lines
}}
slicePool := sync.Pool{New: func() interface{} {
       lines := make([]string, 100)
       return lines
}}
r := bufio.NewReader(f)
var wg sync.WaitGroup //wait group to keep track off all threads
for {

 buf := linesPool.Get().([]byte)
 n, err := r.Read(buf)
 buf = buf[:n]
if n == 0 {
    if err != nil {
        fmt.Println(err)
        break
    }
    if err == io.EOF {
        break
    }
    return err
 }
nextUntillNewline, err := r.ReadBytes('\n')//read entire line

 if err != io.EOF {
     buf = append(buf, nextUntillNewline...)
 }

 wg.Add(1)
 go func() { 

    //process each chunk concurrently
    //start -> log start time, end -> log end time

    ProcessChunk(buf, &linesPool, &stringPool, &slicePool,     start, end)
wg.Done()

 }()
}
wg.Wait()


上面的代码,引入了两个优化点:
  1. sync.Pool是一个强大的对象池,可以重用对象来减轻垃圾收集器的压力。我们将重用各个分片的内存,以减少内存消耗,大大加快我们的工作。
  2. Go Routines帮助我们同时处理缓冲区块,这大大提高了处理速度。


现在让我们实现ProcessChunk函数,它将处理以下格式的日志行。
2020-01-31T20:12:38.1234Z, Some Field, Other Field, And so on, Till new line,...\n

我们将根据命令行提供的时间戳提取日志。
func ProcessChunk(chunk []byte, linesPool *sync.Pool, stringPool *sync.Pool, slicePool *sync.Pool, start time.Time, end time.Time) {
//another wait group to process every chunk further                             
  var wg2 sync.WaitGroup
logs := stringPool.Get().(string)
logs = string(chunk)
linesPool.Put(chunk) //put back the chunk in pool
//split the string by "\n", so that we have slice of logs
  logsSlice := strings.Split(logs, "\n")
stringPool.Put(logs) //put back the string pool
chunkSize := 100 //process the bunch of 100 logs in thread
n := len(logsSlice)
noOfThread := n / chunkSize
if n%chunkSize != 0 { //check for overflow 
     noOfThread++
  }
length := len(logsSlice)
//traverse the chunk
 for i := 0; i < length; i += chunkSize {

     wg2.Add(1)
//process each chunk in saperate chunk
     go func(s int, e int) {
        for i:= s; i<e;i++{
           text := logsSlice[i]
if len(text) == 0 {
              continue
           }

        logParts := strings.SplitN(text, ",", 2)
        logCreationTimeString := logParts[0]
        logCreationTime, err := time.Parse("2006-01-  02T15:04:05.0000Z", logCreationTimeString)
if err != nil {
             fmt.Printf("\n Could not able to parse the time :%s       for log : %v", logCreationTimeString, text)
             return
        }
// check if log's timestamp is inbetween our desired period
      if logCreationTime.After(start) && logCreationTime.Before(end) {

        fmt.Println(text)
       }
    }
    textSlice = nil
    wg2.Done()

 }(i*chunkSize, int(math.Min(float64((i+1)*chunkSize), float64(len(logsSlice)))))
//passing the indexes for processing
}  
wg2.Wait() //wait for a chunk to finish
logsSlice = nil


对上面的代码进行基准测试。以16 GB的日志文件为例,提取日志所需的时间约为25秒。

完整的代码示例如下:
func main() {

s := time.Now()
args := os.Args[1:]
if len(args) != 6 { // for format  LogExtractor.exe -f "From Time" -t "To Time" -i "Log file directory location"
    fmt.Println("Please give proper command line arguments")
    return
}
startTimeArg := args[1]
finishTimeArg := args[3]
fileName := args[5]

file, err := os.Open(fileName)

if err != nil {
    fmt.Println("cannot able to read the file", err)
    return
}

defer file.Close() //close after checking err

queryStartTime, err := time.Parse("2006-01-02T15:04:05.0000Z", startTimeArg)
if err != nil {
    fmt.Println("Could not able to parse the start time", startTimeArg)
    return
}

queryFinishTime, err := time.Parse("2006-01-02T15:04:05.0000Z", finishTimeArg)
if err != nil {
    fmt.Println("Could not able to parse the finish time", finishTimeArg)
    return
}

filestat, err := file.Stat()
if err != nil {
    fmt.Println("Could not able to get the file stat")
    return
}

fileSize := filestat.Size()
offset := fileSize - 1
lastLineSize := 0

for {
    b := make([]byte, 1)
    n, err := file.ReadAt(b, offset)
    if err != nil {
        fmt.Println("Error reading file ", err)
        break
    }
    char := string(b[0])
    if char == "\n" {
        break
    }
    offset--
    lastLineSize += n
}

lastLine := make([]byte, lastLineSize)
_, err = file.ReadAt(lastLine, offset+1)

if err != nil {
    fmt.Println("Could not able to read last line with offset", offset, "and lastline size", lastLineSize)
    return
}

logSlice := strings.SplitN(string(lastLine), ",", 2)
logCreationTimeString := logSlice[0]

lastLogCreationTime, err := time.Parse("2006-01-02T15:04:05.0000Z", logCreationTimeString)
if err != nil {
    fmt.Println("can not able to parse time : ", err)
}

if lastLogCreationTime.After(queryStartTime) && lastLogCreationTime.Before(queryFinishTime) {
    Process(file, queryStartTime, queryFinishTime)
}

fmt.Println("\nTime taken - ", time.Since(s))
}

func Process(f *os.File, start time.Time, end time.Time) error {

linesPool := sync.Pool{New: func() interface{} {
    lines := make([]byte, 250*1024)
    return lines
}}

stringPool := sync.Pool{New: func() interface{} {
    lines := ""
    return lines
}}

r := bufio.NewReader(f)

var wg sync.WaitGroup

for {
    buf := linesPool.Get().([]byte)

    n, err := r.Read(buf)
    buf = buf[:n]

    if n == 0 {
        if err != nil {
            fmt.Println(err)
            break
        }
        if err == io.EOF {
            break
        }
        return err
    }

    nextUntillNewline, err := r.ReadBytes('\n')

    if err != io.EOF {
        buf = append(buf, nextUntillNewline...)
    }

    wg.Add(1)
    go func() {
        ProcessChunk(buf, &linesPool, &stringPool, start, end)
        wg.Done()
    }()

}

wg.Wait()
return nil
}

func ProcessChunk(chunk []byte, linesPool *sync.Pool, stringPool *sync.Pool, start time.Time, end time.Time) {

var wg2 sync.WaitGroup

logs := stringPool.Get().(string)
logs = string(chunk)

linesPool.Put(chunk)

logsSlice := strings.Split(logs, "\n")

stringPool.Put(logs)

chunkSize := 300
n := len(logsSlice)
noOfThread := n / chunkSize

if n%chunkSize != 0 {
    noOfThread++
}

for i := 0; i < (noOfThread); i++ {

    wg2.Add(1)
    go func(s int, e int) {
        defer wg2.Done() //to avaoid deadlocks
        for i := s; i < e; i++ {
            text := logsSlice[i]
            if len(text) == 0 {
                continue
            }
            logSlice := strings.SplitN(text, ",", 2)
            logCreationTimeString := logSlice[0]

            logCreationTime, err := time.Parse("2006-01-02T15:04:05.0000Z", logCreationTimeString)
            if err != nil {
                fmt.Printf("\n Could not able to parse the time :%s for log : %v", logCreationTimeString, text)
                return
            }

            if logCreationTime.After(start) && logCreationTime.Before(end) {
                //fmt.Println(text)
            }
        }


    }(i*chunkSize, int(math.Min(float64((i+1)*chunkSize), float64(len(logsSlice)))))
}

wg2.Wait()
logsSlice = nil



原文链接:Reading 16GB File in Seconds, Golang (翻译:钟涛)

0 个评论

要回复文章请先登录注册