Skip to content

Commit

Permalink
add first_reading_at to devices, allow passing limit to readings endp…
Browse files Browse the repository at this point in the history
…oint
  • Loading branch information
timcowlishaw committed Nov 21, 2024
1 parent b71ba6d commit f993c3e
Show file tree
Hide file tree
Showing 9 changed files with 105 additions and 4 deletions.
4 changes: 4 additions & 0 deletions app/models/kairos.rb
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ def self.query params
rollup_value = params[:rollup].to_i
rollup_unit = Kairos.get_timespan( params[:rollup].gsub(rollup_value.to_s,'') )

limit = params[:limit]&.to_i

device = Device.find(params[:id])

if sensor_key = params[:sensor_key]
Expand All @@ -26,6 +28,7 @@ def self.query params

component = device.find_component_by_sensor_id(sensor_id)


unless component
return {
device_id: params[:id],
Expand All @@ -44,6 +47,7 @@ def self.query params
metrics = [{
tags: { device_id: params[:id] },
name: sensor_key,
limit: limit,
aggregators: [
{
name: function,
Expand Down
4 changes: 4 additions & 0 deletions app/models/raw_storer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,10 @@ def store data, mac, version, ip, raise_errors=false
device.update_columns(last_reading_at: parsed_ts, data: sql_data, state: 'has_published')
end

if !device.first_reading_at || parsed_ts < device.first_reading_at
device.update_columns(first_reading_at: parsed_ts)
end

forward_readings(device, [sql_data])
rescue Exception => e

Expand Down
17 changes: 15 additions & 2 deletions app/models/storer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,12 @@ def store device, readings
kairos_publish(parsed_reading[:_data])
readings_to_forward << parsed_reading[:sql_data]
if index == 0
update_device(device, parsed_reading[:parsed_ts], parsed_reading[:sql_data])
update_device_last_data(device, parsed_reading[:parsed_ts], parsed_reading[:sql_data])
end

if index == (readings.length - 1)
update_device_first_reading_at(device, parsed_reading[:parsed_ts])
end

rescue Exception => e
Sentry.capture_exception(e)
Expand All @@ -22,7 +25,17 @@ def store device, readings
forward_readings(device, readings_to_forward)
end

def update_device(device, parsed_ts, sql_data)
def update_device_first_reading_at(device, parsed_ts)
return if parsed_ts <= Time.at(0)
device.transaction do
device.lock!
if !device.first_reading_at || parsed_ts < device.first_reading_at
device.update_columns(first_reading_at: parsed_ts)
end
end
end

def update_device_last_data(device, parsed_ts, sql_data)
return if parsed_ts <= Time.at(0)
device.transaction do
device.lock!
Expand Down
5 changes: 5 additions & 0 deletions db/migrate/20241114053104_add_first_reading_at_to_devices.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
class AddFirstReadingAtToDevices < ActiveRecord::Migration[6.1]
def change
add_column :devices, :first_reading_at, :timestamp
end
end
3 changes: 2 additions & 1 deletion db/schema.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
#
# It's strongly recommended that you check this file into your version control system.

ActiveRecord::Schema.define(version: 2024_11_13_155952) do
ActiveRecord::Schema.define(version: 2024_11_14_053104) do

# These are extensions that must be enabled in order to support this database
enable_extension "adminpack"
Expand Down Expand Up @@ -109,6 +109,7 @@
t.string "hardware_slug_override"
t.boolean "precise_location", default: true, null: false
t.boolean "enable_forwarding", default: false, null: false
t.datetime "first_reading_at"
t.index ["device_token"], name: "index_devices_on_device_token", unique: true
t.index ["geohash"], name: "index_devices_on_geohash"
t.index ["last_reading_at"], name: "index_devices_on_last_reading_at"
Expand Down
18 changes: 18 additions & 0 deletions lib/tasks/devices.rake
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,22 @@ namespace :devices do
device.save!(validate: false)
end
end

task :set_first_reading_at => :environment do
from_date = Device.order(:created_at).first.created_at
Device.where(state: "has_published").each do |device|
component = device.components.where("last_reading_at IS NOT NULL").first
if component
begin
readings = Kairos.query(id: device.id, sensor_key: component.key, function: "first", rollup: "1s", limit: 1, start_absolute: from_date)["readings"]
first_reading_timestamp = readings[0][0] if readings&.any?
device.update_columns(first_reading_at: first_reading_timestamp) if first_reading_timestamp
rescue JsonNull
nil
end
end
print "."
end
print "\n"
end
end
28 changes: 28 additions & 0 deletions spec/models/raw_storer_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,34 @@ def to_ts(time)
storer.store(json, device.mac_address, "1.1-0.9.0-A", "127.0.0.1", true)
end

context "when the device has no first_reading_at timestamp" do
it "sets the first_reading_at timestamp" do
ts = Time.parse(json[:timestamp])
storer.store(json, device.mac_address, "1.1-0.9.0-A", "127.0.0.1", true)
expect(device.reload.first_reading_at).to eq(ts)
end
end

context "when the device's first_reading_at timestamp is after the reading timestamp" do
it "updates the device's first_reading_at timestamp" do
ts = Time.parse(json[:timestamp])
device.first_reading_at = ts + 1.day
device.save
storer.store(json, device.mac_address, "1.1-0.9.0-A", "127.0.0.1", true)
expect(device.reload.first_reading_at).to eq(ts)
end
end

context "when the device's first_reading_at timestamp is before the reading timestamp" do
it "does not update the device's first_reading_at timestamp" do
ts = Time.parse(json[:timestamp])
previous_timestamp = device.first_reading_at = ts - 1.day
device.save
storer.store(json, device.mac_address, "1.1-0.9.0-A", "127.0.0.1", true)
expect(device.reload.first_reading_at).to eq(previous_timestamp)
end
end

it "will not be created with invalid future timestamp" do
ts = { timestamp: to_ts(2.days.from_now) }
includes_proxy = double({ where: double({last: device.reload})})
Expand Down
29 changes: 29 additions & 0 deletions spec/models/storer_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,35 @@
end.not_to raise_error
end

context "when the device has no first_reading_at timestamp" do
it "sets the first_reading_at timestamp" do
ts = Time.parse(@data["recorded_at"])
storer.store(device, [@data])
expect(device.reload.first_reading_at).to eq(ts)
end
end

context "when the device's first_reading_at timestamp is after the reading timestamp" do
it "updates the device's first_reading_at timestamp" do
ts = Time.parse(@data["recorded_at"])
device.first_reading_at = ts + 1.day
device.save
storer.store(device, [@data])
expect(device.reload.first_reading_at).to eq(ts)
end
end

context "when the device's first_reading_at timestamp is before the reading timestamp" do
it "does not update the device's first_reading_at timestamp" do
ts = Time.parse(@data["recorded_at"])
previous_timestamp = device.first_reading_at = ts - 1.day
device.save
storer.store(device, [@data])
expect(device.reload.first_reading_at).to eq(previous_timestamp)
end
end


it "updates the component last_reading_at timestamp for each of the provided sensors" do
expect(device).to receive(:update_component_timestamps).with(
Time.parse(@data['recorded_at']),
Expand Down
1 change: 0 additions & 1 deletion spec/requests/v0/password_resets_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,6 @@
it "can reset password with valid token" do
expect(user.authenticate('newpass')).to be_falsey
j = api_put "password_resets/#{user.password_reset_token}", { password: 'newpass' }
p response
expect(j["username"]).to eq(user.username)
expect(response.status).to eq(200)

Expand Down

0 comments on commit f993c3e

Please sign in to comment.