Skip to content

Commit

Permalink
feat: 审计采集数据分批发送 --story=118658293 (#449)
Browse files Browse the repository at this point in the history
  • Loading branch information
chenjiandongx authored Jul 22, 2024
1 parent 139086d commit f33fbee
Show file tree
Hide file tree
Showing 10 changed files with 65 additions and 23 deletions.
2 changes: 1 addition & 1 deletion pkg/bkmonitorbeat/VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
v3.41.x
v3.42.x
4 changes: 2 additions & 2 deletions pkg/bkmonitorbeat/beater/taskfactory/socketsnapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@ package taskfactory
import (
"github.com/TencentBlueKing/bkmonitor-datalink/pkg/bkmonitorbeat/configs"
"github.com/TencentBlueKing/bkmonitor-datalink/pkg/bkmonitorbeat/define"
"github.com/TencentBlueKing/bkmonitor-datalink/pkg/bkmonitorbeat/tasks/socketsanpshot"
"github.com/TencentBlueKing/bkmonitor-datalink/pkg/bkmonitorbeat/tasks/socketsnapshot"
)

func init() {
SetTaskConfigByName(define.ModuleSocketSnapshot, func() define.TaskMetaConfig { return new(configs.SocketSnapshotConfig) })
Register(define.ModuleSocketSnapshot, socketsanpshot.New)
Register(define.ModuleSocketSnapshot, socketsnapshot.New)
}
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ func (p *OomParser) StreamOoms(ctx context.Context, outStream chan<- *OomInstanc
outStream <- oomCurrentInstance
}
case <-ctx.Done():
logger.Errorf("exiting analyzeLines. OOM events will not be reported.")
logger.Info("exiting analyzeLines. OOM events will not be reported.")
return
}
}
Expand Down
20 changes: 15 additions & 5 deletions pkg/bkmonitorbeat/tasks/procbin/gather.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,14 +53,16 @@ func (g *Gather) Run(ctx context.Context, e chan<- define.Event) {
return
}

total := len(procs)
if total <= 0 {
return
}

const size = 5000
var procbins []ProcBin
pcs := make(map[pidCreated]struct{})
for i := 0; i < len(procs); i++ {
proc := procs[i]
if proc.Cmd == "" {
continue
}

pc := pidCreated{pid: proc.Pid, created: proc.Created}
si := readStatInfo(pc, proc.Exe, g.config.MaxBytes)
pcs[pc] = struct{}{}
Expand All @@ -79,8 +81,16 @@ func (g *Gather) Run(ctx context.Context, e chan<- define.Event) {
Change: si.Change.Unix(),
Access: si.Access.Unix(),
})
if len(procbins) >= size {
e <- &Event{dataid: g.config.DataID, data: procbins, utcTime: now}
procbins = make([]ProcBin, 0) // 重置状态
}
}

// 确保数据完整发送
if len(procbins) > 0 {
e <- &Event{dataid: g.config.DataID, data: procbins, utcTime: now}
}
e <- &Event{dataid: g.config.DataID, data: procbins, utcTime: now}
cleanupCached(pcs)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -429,7 +429,6 @@ func getConcernPidInodes(pids []int32) map[int32][]uint64 {

inodes, err := getProcInodes("/proc", pid)
if err != nil {
logger.Errorf("failed to get /proc info: %v", err)
continue
}
ret[pid] = inodes
Expand Down
24 changes: 17 additions & 7 deletions pkg/bkmonitorbeat/tasks/procsnapshot/gather.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ package procsnapshot

import (
"context"
"math"
"sync/atomic"
"time"

Expand Down Expand Up @@ -52,13 +53,22 @@ func (g *Gather) Run(ctx context.Context, e chan<- define.Event) {
return
}

ret := make([]ProcMeta, 0)
for i := 0; i < len(procs); i++ {
p := procs[i]
if p.Cmd == "" {
continue
total := len(procs)
if total <= 0 {
return
}

const size = 5000
batch := int(math.Ceil(float64(total) / float64(size))) // 向上取整
var start, end int
for i := 0; i < batch; i++ {
end = start + size
if end >= total {
end = total
}
ret = append(ret, p)

data := procs[start:end]
e <- &Event{dataid: g.config.DataID, data: data, utcTime: now}
start = end
}
e <- &Event{dataid: g.config.DataID, data: ret, utcTime: now}
}
8 changes: 6 additions & 2 deletions pkg/bkmonitorbeat/tasks/procsnapshot/procs.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,12 +72,16 @@ func AllProcsMeta() ([]ProcMeta, error) {
time.Sleep(time.Millisecond * socketPerformanceSleep)
}

stat, err := getProcMeta(pid)
meta, err := getProcMeta(pid)
if err != nil {
logger.Warnf("get process meta data failed, pid: %d, err: %v", pid, err)
continue
}
ret = append(ret, stat)
// 采集不到 cmd 的进程忽略
if meta.Cmd == "" {
continue
}
ret = append(ret, meta)
}

return ret, nil
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
// an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
// specific language governing permissions and limitations under the License.

package socketsanpshot
package socketsnapshot

import (
"time"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,11 @@
// an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
// specific language governing permissions and limitations under the License.

package socketsanpshot
package socketsnapshot

import (
"context"
"math"
"sync/atomic"
"time"

Expand Down Expand Up @@ -62,5 +63,23 @@ func (g *Gather) Run(ctx context.Context, e chan<- define.Event) {
logger.Errorf("faile to get procs sockets: %v", err)
return
}
e <- &Event{dataid: g.config.DataID, data: sockets, utcTime: now}

total := len(sockets)
if total <= 0 {
return
}

const size = 10000
batch := int(math.Ceil(float64(total) / float64(size))) // 向上取整
var start, end int
for i := 0; i < batch; i++ {
end = start + size
if end >= total {
end = total
}

data := sockets[start:end]
e <- &Event{dataid: g.config.DataID, data: data, utcTime: now}
start = end
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
// an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
// specific language governing permissions and limitations under the License.

package socketsanpshot
package socketsnapshot

import (
"fmt"
Expand Down

0 comments on commit f33fbee

Please sign in to comment.