Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pcp2elasticsearch: support dynamic mapping #2117

Merged
merged 2 commits into from
Jan 15, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 17 additions & 7 deletions qa/1130
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
# PCP QA Test No. 1130
# checks basic pcp2elasticsearch functionality
#
# Copyright (c) 2017-2019 Red Hat.
# Copyright (c) 2017-2025 Red Hat.
#
seq=`basename $0`
echo "QA output created by $seq"
Expand All @@ -13,7 +13,7 @@ $python -c "from pcp import pmapi" >/dev/null 2>&1
[ $? -eq 0 ] || _notrun "python pcp pmapi module not installed"

# From Andreas 27 Jul 2022 ...
# [pcp2elasticsearch] now uses a method='PUT' argument, but this is
# [pcp2elasticsearch] now uses a method='POST' argument, but this is
# not available in Python versions before 3.3
#
__version=`$python --version 2>&1 | sed -e 's/Python //'`
Expand Down Expand Up @@ -90,11 +90,21 @@ wait
#
grep -q 'socat.* Remote end closed connection without response' $tmp.socat.err && \
_notrun "socat on this platform is not behaving as expected [closed conn]"
grep -q 'socat.* Connection reset by peer' $tmp.socat.log && \
_notrun "socat on this platform is not behaving as expected [reset conn]"

# From Paul 13 Jan 2025 ...
# Getting reset connection messages with socat even though exporter works
# correctly meaning the test is not being run. Happens with prior upstream
# exporter version also:
# ... socat[957217] E read(6, 0x558187c97000, 8192): Connection reset by peer
#
# Commenting out the check and allowing QA test to run
#
#grep -q 'socat.* Connection reset by peer' $tmp.socat.log && \
# _notrun "socat on this platform is not behaving as expected [reset conn]"

grep -q 'socat.* Broken pipe' $tmp.socat.log && \
_notrun "socat on this platform is not behaving as expected [broken pipe]"
grep -E -q '^PUT /+pcp HTTP/' $tmp.socat.err
grep -E -q '^POST /+pcp/_doc HTTP/' $tmp.socat.err
[ $? -eq 0 ] && echo "Found correct index in output"
grep -E -q '"hinv": {"ncpu": '$ncpu'}' $tmp.socat.err
[ $? -eq 0 ] && echo "Found correct value in output"
Expand All @@ -108,7 +118,7 @@ $pcp2elasticsearch -t 1 -s 1 -X QAHOST -x INDEX hinv.ncpu >$tmp.p2e.out 2>$tmp.p
sleep 2
$signal $pid 2>/dev/null
wait
grep -E -q '^PUT /+INDEX HTTP/' $tmp.socat.err
grep -E -q '^POST /+INDEX/_doc HTTP/' $tmp.socat.err
[ $? -eq 0 ] && echo "Found correct index in output"
grep -E -q '"@host-id": "QAHOST"' $tmp.socat.err
[ $? -eq 0 ] && echo "Found proper hostid in output"
Expand All @@ -123,7 +133,7 @@ sleep 2
$signal $pid 2>/dev/null
wait
echo "--- Start of received data ---"
grep -E '(mappings|slack)' $tmp.socat.err | sed -e 's,< [1-9].*,,g' >$tmp.archive.data
grep -E '(slack)' $tmp.socat.err | sed -e 's,< [1-9].*,,g' >$tmp.archive.data
while read -r line
do
echo $line | _filter_prec | pmjson | LC_COLLATE=POSIX sort | tr -d ,
Expand Down
75 changes: 0 additions & 75 deletions qa/1130.out
Original file line number Diff line number Diff line change
Expand Up @@ -8,21 +8,6 @@ Found correct index in output
Found proper hostid in output
=== 3. pcp2elasticsearch full-blown archive replay session ===
--- Start of received data ---
"type": "epoch_milli"
"type": "string"
"@host-id": {
"@timestamp": {
}
}
"properties": {
}
"pcp-metric": {
}
"ignore": 400
"mappings": {
}
{
}
"@id": "000001 /usr/lib/systemd/systemd --switched-root --system --deserialize 21"
"@id": "1 minute"
"@id": "cpu0"
Expand Down Expand Up @@ -86,21 +71,6 @@ Found proper hostid in output
}
}
{
}
"type": "epoch_milli"
"type": "string"
"@host-id": {
"@timestamp": {
}
}
"properties": {
}
"pcp-metric": {
}
"ignore": 400
"mappings": {
}
{
}
"@id": "cpu0"
"user": 0.0
Expand Down Expand Up @@ -181,21 +151,6 @@ Found proper hostid in output
}
}
{
}
"type": "epoch_milli"
"type": "string"
"@host-id": {
"@timestamp": {
}
}
"properties": {
}
"pcp-metric": {
}
"ignore": 400
"mappings": {
}
{
}
"@id": "cpu0"
"user": 0.0
Expand Down Expand Up @@ -276,21 +231,6 @@ Found proper hostid in output
}
}
{
}
"type": "epoch_milli"
"type": "string"
"@host-id": {
"@timestamp": {
}
}
"properties": {
}
"pcp-metric": {
}
"ignore": 400
"mappings": {
}
{
}
"@id": "cpu0"
"user": 9.999
Expand Down Expand Up @@ -371,21 +311,6 @@ Found proper hostid in output
}
}
{
}
"type": "epoch_milli"
"type": "string"
"@host-id": {
"@timestamp": {
}
}
"properties": {
}
"pcp-metric": {
}
"ignore": 400
"mappings": {
}
{
}
"@id": "cpu0"
"user": 0.0
Expand Down
27 changes: 4 additions & 23 deletions src/pcp2elasticsearch/pcp2elasticsearch.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#!/usr/bin/env pmpython
#
# Copyright (C) 2015-2021 Marko Myllynen <[email protected]>
# Copyright (C) 2014-2018,2022 Red Hat.
# Copyright (C) 2014-2018,2025 Red Hat.
#
# This program is free software; you can redistribute it and/or modify it
# under the terms of the GNU General Public License as published by the
Expand Down Expand Up @@ -47,7 +47,7 @@
CONFVER = 1
ES_SERVER = "http://localhost:9200/"
ES_INDEX = "pcp"
ES_SEARCH_TYPE = "pcp-metric"
ES_SEARCH_TYPE = "_doc"

class pcp2elasticsearch(object):
""" PCP to Elasticsearch """
Expand Down Expand Up @@ -429,26 +429,6 @@ def write_es(self, timestamp):

ts = self.context.datetime_to_secs(self.pmfg_ts(), PM_TIME_MSEC)

try:
body = {'ignore': 400,
'mappings': {'pcp-metric':
{'properties':{'@timestamp':{'type':'epoch_milli'},
'@host-id':{'type':'string'}}}}}
data = json.dumps(body).encode('utf-8')
headers = {'content-type': 'application/json'}
url = self.es_server + '/' + self.es_index
req = httprequest.Request(url=url, data=data, headers=headers, method='PUT')
with httprequest.urlopen(req) as f:
f.close()
if self.es_failed:
sys.stderr.write("Reconnected to Elasticsearch server %s.\n" % (self.es_server))
self.es_failed = False
except Exception as put_failed:
if not self.es_failed:
sys.stderr.write("Cannot connect to Elasticsearch server %s: %s, continuing.\n" % (self.es_server, str(put_failed)))
self.es_failed = True
return

# Assemble all metrics into a single document
# Use @-prefixed keys for metadata not coming in from PCP metrics
es_doc = {'@host-id': self.es_hostid, '@timestamp': long(ts)}
Expand Down Expand Up @@ -514,8 +494,9 @@ def write_es(self, timestamp):
insts.append({inst_key: name, last_part: value})

try:
headers = {'content-type': 'application/json'}
data = json.dumps(es_doc).encode('utf-8')
url = self.es_server + '/' + self.es_index + '/' + self.es_search_type
url = self.es_server + self.es_index + '/' + self.es_search_type
req = httprequest.Request(url=url, data=data, headers=headers, method='POST')
with httprequest.urlopen(req) as f:
f.close()
Expand Down
Loading