forked from trynthink/scout
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathcom_mseg_tech.py
1176 lines (975 loc) · 52.6 KB
/
com_mseg_tech.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
#!/usr/bin/env python3
# Import commercial microsegments code to use some of its data
# reading and processing functions
import com_mseg as cm
import numpy as np
import numpy.lib.recfunctions as recfn
import re
import warnings
import json
import csv
class EIAData(object):
"""Class of variables naming the EIA data files to be imported.
Attributes:
cpl_data (str): File name for the EIA AEO technology data.
tpp_data (str): File name for the EIA AEO time preference premium data.
"""
def __init__(self):
self.cpl_data = 'ktek.csv'
self.tpp_data = 'kprem.txt'
class UsefulVars(object):
"""Set up class to contain what would otherwise be global variables.
Attributes:
json_in (str): File name for the input JSON database.
json_out (str): File name for the JSON output from this script.
aeo_metadata (str): File name for the custom AEO metadata JSON.
cpl_data_skip_lines (int): The number of lines of preamble that
must be skipped at the beginning of the EIA AEO technology
data file.
columns_to_keep (list): A list of strings defining the columns
from the EIA AEO technology data file that are required.
tpp_data_skip_lines (int): The number of lines of preamble
that must be skipped before finding data in the EIA AEO
time preference premium data file.
tpp_dtypes (list): A list of tuples in the format of a numpy
dtype definition, specifying the expected dtype and desired
column headings for the time preference premium data.
"""
def __init__(self):
self.json_in = 'cpl_res_cdiv.json'
self.json_out = 'cpl_res_com_cdiv.json'
self.aeo_metadata = 'metadata.json'
self.cpl_data_skip_lines = 100
self.columns_to_keep = ['t', 'v', 'r', 's', 'f', 'eff', 'c1', 'c2',
'Life', 'y1', 'y2', 'technology name']
self.tpp_data_skip_lines = 100
self.tpp_dtypes = [('Proportion', 'f8'), ('Time Pref Premium', 'f8'),
('Year', 'i4'), ('End Use', 'U32')]
class UsefulDicts(object):
"""Set up class for dicts to relate diferent data file formats.
Attributes:
kprem_endusedict (dict): Keys are the strings found in the time
preference premium data, and values are the strings used in
the JSON database. Further conversion to numeric indices
used in other EIA data files can be performed using the
dicts found in com_mseg.py.
"""
def __init__(self):
self.kprem_endusedict = {
'heating': 'Space Heating',
'cooling': 'Space Cooling',
'water heating': 'Hot Water Heating',
'ventilation': 'Ventilation',
'cooking': 'Cooking',
'lighting': 'Lighting',
'refrigeration': 'Refrigeration'}
def units_id(sel, flag):
"""Provides a units text string for a specified microsegment.
Depending on the end use number given in the sel list, which
specifies the entire microsegment, this function returns the
string that best describes the units for that end use. Using
this function ensures that the units are consistent throughout
the output data. The unit definitions are based on the preamble
text in the EIA AEO commercial buildings technology data file.
Args:
sel (list): A list of four integers that, together, define a
microsegment, and correspond to the climate zone, building
type, end use, and fuel type, in that order.
flag (str): Indicates the type of units to be returned, either
'cost' or 'performance'.
Returns:
A text string of the appropriate units for the input arguments
to the function.
"""
# For readability, assign the end use number to a clearly named variable
enduse = sel[2]
# Determine units depending on whether this function was called
# for cost or performance units
if flag == 'cost':
if enduse == 4: # ventilation
theunits = '2013$/1000 cfm'
elif enduse == 6: # lighting
theunits = '2013$/1000 lm'
else:
theunits = '2013$/kBTU out/hr'
elif flag == 'performance':
if enduse == 4: # ventilation
theunits = 'cfm-hr/BTU in'
elif enduse == 6: # lighting
theunits = 'lm/W'
else:
theunits = 'BTU out/BTU in'
return theunits
def tech_data_selector(tech_data, sel):
""" From the full structured array of cost, performance, and
lifetime data from the AEO, extract a group of data using numeric
indices generated from the text indices at the leaf nodes of the
input microsegments JSON. Each group of data extracted by this
function will correspond to multiple technologies and performance
levels and will require further processing. """
# Determine whether the data indicated in the 'r' column indicates
# building type or census division based on the end use indicated
# (building type for ventilation, lighting, and refrigeration)
if sel[2] in [4, 6, 7]:
tmp = sel[1] # use building type
else:
tmp = sel[0] # use census division
# Filter technology data based on the specified census
# division or building type, end use, and fuel type
filtered = tech_data[np.all([tech_data['r'] == tmp,
tech_data['s'] == sel[2],
tech_data['f'] == sel[3]], axis=0)]
return filtered
def sd_data_selector(sd_data, sel, years):
""" From the full structured array of service demand data from the
AEO, extract just the service demand data corresponding to the
census division, building type, end use, and fuel type specified by
each leaf node in the input microsegments JSON. Each group of data
are converted into two outputs, 1) a numpy array of service demand
summed across the three specified markets (column named 'd'), with
rows for each technology and performance level combination and
columns for each year, and 2) a list of technology names for
each row of the service demand numpy array (the other output). """
# Filter service demand data based on the specified census
# division, building type, end use, and fuel type
filtered = sd_data[np.all([sd_data['r'] == sel[0],
sd_data['b'] == sel[1],
sd_data['s'] == sel[2],
sd_data['f'] == sel[3]], axis=0)]
# Identify each technology and performance level using the text
# in the description field since the technology type and vintage
# numeric codes are not well-matched to individual technology and
# performance levels; remove empty strings from the list
technames = list(np.unique(filtered['Description']))
technames = [x for x in technames if x != '']
# Set up numpy array to store restructured data, in which each row
# will correspond to a single technology
sd = np.zeros((len(technames), len(years)))
# Combine the service demand for the three markets ['d'] in the data
for idx, name in enumerate(technames):
# Extract entries for a given technology name
entries = filtered[filtered['Description'] == name]
# Calculate the sum of all year columns and write it to the
# appropriate row in the sd array (note that the .view()
# function converts the structured array into a standard
# numpy array, which allows the use of the .sum() function)
sd[idx, ] = np.sum(
entries[list(map(str, years))].view(('<f8', len(years))), axis=0)
# Note that each row in sd corresponds to a single performance
# level for a single technology and the rows are in the same order
# as the technames list
return sd, technames
def single_tech_selector(tech_array, specific_name):
"""Extracts a single technology from tech data for an entire microsegment.
Each microsegment is comprised of multiple technologies. Cost,
performance, and lifetime data are needed for each technology in a
microsegment. This function separates out those data for a specific
technology from all of the technologies in the microsegment so that
they can be processed and further restructured for later output.
Args:
tech_array (numpy.ndarray): EIA technology characteristics
data available for a single microsegment, including cost,
performance, and lifetime data for (typically multiple)
performance scenarios for each technology applicable to
that microsegment.
specific_name (type): The name of the technology to be extracted.
Returns:
A numpy structured array with the same columns as other tech
data, but with only the rows corresponding to the technology
indicated by specific_name.
"""
# Initialize a list of rows to remove from the numpy array
# that do not correspond to the specified technology
rows_to_remove = []
for idx, row in enumerate(tech_array):
# Identify the technology name from the 'technology name' column
# in the data using a regex set up to match any text '.+?' that
# appears before the first occurrence of a space followed by a
# 2 and three other numbers (i.e., 2009 or 2035)
tech_name = re.search(r'.+?(?=\s2[0-9]{3})', row['technology name'])
# If the technology name regex returned a match, check if there
# is a match for a linear fluorescent lighting technology; in
# either case (either the linear fluorescent or the more
# generic technology name regex), if the match is not the same
# as the name passed to the function, remove the row
if tech_name:
# Test whether the technology name corresponds to a linear
# fluorescent lighting technology in the format 'T# F##',
# e.g., 'T8 F96', and if it does, extract just that string
# without any additional text (e.g., 'T8 F96 High Output')
lfl_tech_name = re.search('^(T[0-9] F[0-9]{2})',
tech_name.group(0))
if lfl_tech_name:
if lfl_tech_name.group(0) != specific_name:
rows_to_remove.append(idx)
elif tech_name.group(0) != specific_name:
rows_to_remove.append(idx)
# If there's no match, the technology might not have a year
# included as part of its name, but it nonetheless should be
# checked to see if it matches the name passed to the function
# and removed if there is not a match
elif row['technology name'] != specific_name:
rows_to_remove.append(idx)
# Else check to see if the description indicates a placeholder
# row, which should be deleted before the technologies are
# summarized and returned from this function
elif re.search('placeholder', row['technology name']):
rows_to_remove.append(idx)
# Implicitly, if the text does not match any regex, it is
# assumed that it does not need to be edited or removed
# Delete the placeholder rows
result = np.delete(tech_array, rows_to_remove, 0)
return result
def cost_perf_extractor(single_tech_array, sd_array, sd_names, years, flag):
"""Produces a dict of cost or performance data for a single technology.
From a numpy structured array of data for a single technology
with several rows corresponding to different performance levels,
this function converts the reported capital costs or efficiencies
for all of the different performance levels into a mean (called
'typical' in the output dict) and a maximum ('best' in the output
dict) for this technology class. Service demand data for each of
the performance levels is used to calculate a service demand-
weighted cost or efficiency for the 'typical' or mean case.
A unique value is calculated and reported for each year in the
years vector, which specifies the range of years over which the
final data are to be output to the cost/performance/lifetime JSON.
Args:
single_tech_array (numpy.ndarray): Structured array of EIA
technology characteristics data reduced to the various
performance levels (if applicable) for a single technology
(e.g., 'VAV_Vent' or 'comm_GSHP-heat')
sd_array (numpy.ndarray): EIA service demand data for the entire
microsegment associated with the specific technology that
appears in single_tech_array
sd_names (list): Strings describing the service demand data, with
each entry in the list corresponding to that row in sd_array
years (list): The range of years of interest, each as YYYY
flag (str): String that should be either 'cost' or 'performance'
to indicate the type of data the function is processing and
will return
Returns:
A top-level dict with keys for the 'typical' and 'maximum' cost
or performance cases, and child dicts for each case with values
reported for each year in years. Also a list of technology
names that didn't match between the technology cost,
performance, and lifetime data and the service demand data.
"""
# Using the string in the 'flag' argument, set a variable
# for the column that contains the desired data to obtain
# from single_tech_array
if flag == 'cost':
col = 'c1'
elif flag == 'performance':
col = 'eff'
# Store the number of rows (different performance levels) in
# single_tech_array and the number of years in the desired
# range for the final data
n_entries = np.shape(single_tech_array)[0]
n_years = len(years)
# Preallocate arrays for the technology cost or performance
# and service demand data
val = np.zeros([n_entries, n_years])
select_sd = np.zeros([n_entries, n_years])
# Preallocate list of non-matching technology names
non_matching_tech_names = []
for idx, row in enumerate(single_tech_array):
# Determine the starting and ending column indices for the
# desired data (cost or performance) related to the
# technology associated with this row
idx_st = row['y1'] - min(years)
# Calculate end index using the smaller of either the last year
# of 'years' or the final year of availability for that technology
idx_en = min(max(years), row['y2']) - min(years) + 1
# If the indices calculated above are in range, record the data
# (cost or performance) in the calculated location(s)
if idx_en > 0:
if idx_st < 0:
idx_st = 0
val[idx, idx_st:idx_en] = row[col]
# If the final year of availability (market exit year) for the
# particular technology performance level corresponding to 'row'
# is before the first year in years, do not update the service
# demand data array used later to calculate val_mean and val_max
if idx_en > 0:
# The technology name from the ktek data must be updated to have
# formatting consistent with the slightly different service
# demand data technology names
# Identify technology name for the current row of the ktek data
name_from_ktek = row['technology name']
# Truncate technology name string from technology data to
# 44 characters since all of the string descriptions in the
# service demand data are limited to 44 characters; there
# is an exception for strings that have '-inch' in them,
# which should be matched to the first n characters, where
# n is either 43 or 48 characters depending on whether
# '-inch' was substituted for '"' or '"'; finally
# remove any trailing spaces that might create text
# matching problems
if re.search('-inch', name_from_ktek[:43]):
length = UsefulVars().trunc_len
else:
length = 44
name_from_ktek = name_from_ktek[:length].strip()
# The number of characters to use for text matching
# determined when the service demand data description
# strings are cleaned up; the substitution of '-inch' for
# '"' will lengthen the string by four characters, thus the
# matching should be done with 48 characters; replacing
# '"' will reduce the length of the string by 1, thus
# the matching should be performed using 43 characters
# Find the matching row in service demand data by comparing
# the row technology name to sd_names and use that index to
# extract the service demand data and insert them into the
# service demand array in the same row as the corresponding
# cost data
try:
select_sd[idx, ] = sd_array[sd_names.index(name_from_ktek), ]
except ValueError:
# If no match is found, add the unmatched technology
# name to a list
non_matching_tech_names.append(name_from_ktek)
# Normalize the service demand data to simplify the calculation of
# the service demand-weighted arithmetic mean of the desired data
# (but perform the calculation only if there is at least one
# non-zero entry in select_sd)
if select_sd.any():
# Suppress any divide by zero warnings
with np.errstate(divide='ignore', invalid='ignore'):
# Calculate the normalized service demand
select_sd = select_sd/np.sum(select_sd, 0)
select_sd = np.nan_to_num(select_sd) # Replace nan from 0/0 with 0
# Using the normalized service demand as the weights, calculate the
# weighted arithmetic mean for each year (each column)
val_mean = np.sum(np.transpose(select_sd)*single_tech_array[col], 1)
# Calculate the maximum cost or performance for each year (each
# column of the technology data array), adjusting for differences
# in the calculation method (the arithmetic mean calculation does
# not take into account market entry and exit years, relying on the
# service demand weights to zero out technologies that are not
# available in a given year) that can occasionally lead to the
# mean being greater than than the maximum
val_max = np.fmax(np.amax(val, 0), val_mean)
# Build complete structured dict with 'typical' and 'best' data
# converted into dicts themselves, indexed by year
final_dict = {'typical': dict(zip(map(str, years), val_mean)),
'best': dict(zip(map(str, years), val_max))}
return final_dict, non_matching_tech_names
def life_extractor(single_tech_array, years):
"""Produces a nested dict of lifetime data for a single technology.
From a numpy structured array with the cost, performance, and
lifetime data for a specific technology, calculate the arithmetic
mean lifetime and 'range', which is calculated for the residential
data as the difference between the maximum and the mean for each
year. This function accounts for cases where the performance levels
for a given technology exit the market before another level enters
with the assumption that the previous lifetime should persist until
the next performance level enters the market.
Args:
single_tech_array (numpy.ndarray): Structured array of EIA
technology characteristics data reduced to the various
performance levels (if applicable) for a single technology
(e.g., 'VAV_Vent' or 'comm_GSHP-heat')
years (list): The range of years of interest, each as YYYY
Returns:
A top-level dict with keys for the 'typical' and 'maximum'
lifetime cases, and child dicts for each case with values
reported for each year in years.
"""
# Store the number of rows (different performance levels) in
# single_tech_array and the number of years in the desired
# range for the final data
n_entries = np.shape(single_tech_array)[0]
n_years = len(years)
# Preallocate arrays for the lifetime data
life = np.zeros([n_entries, n_years])
for idx, row in enumerate(single_tech_array):
# Determine the starting and ending column indices for the
# lifetime of the technology performance level in this row
idx_st = row['y1'] - min(years)
# Calculate end index using the smaller of either the last year
# of 'years' or the final year of availability for that technology
idx_en = min(max(years), row['y2']) - min(years) + 1
# If the indices calculated above are in range, record the
# lifetime in the calculated location(s)
if idx_en > 0:
if idx_st < 0:
idx_st = 0
life[idx, idx_st:idx_en] = row['Life']
# Calculate the mean lifetime for each column, excluding 0 values
with warnings.catch_warnings():
# In cases where a particular technology does not have a
# performance level defined as available for a given year,
# excluding 0 values leaves nothing on which to calculate a
# mean, which triggers a RuntimeWarning that is suppressed
# here using the warnings package
warnings.simplefilter("ignore", category=RuntimeWarning)
life_mean = np.apply_along_axis(
lambda v: np.mean(v[np.nonzero(v)]), 0, life)
# In the special case where no performance level is given because
# the product exits the market before the first year in the 'years'
# vector, make the entire reported mean lifetime equal to 0 in each
# year using the life array, which should still be populated with
# only zeros
if np.all(np.isnan(life_mean)):
life_mean = np.mean(life, 0)
# In the special case where there were years with no performance
# level indicated in the 'life' array, the mean will be 'nan'; it
# is assumed that the previous technology's lifetime persists until
# the next performance level enters the market
elif np.any(np.isnan(life_mean)):
# First, identify the numeric values reported
numbers = life_mean[~np.isnan(life_mean)]
# Then, generate a vector that, for each entry in the final
# life_mean vector, has the index for the appropriate number
# to be pulled from the 'numbers' vector
indices = np.cumsum(~np.isnan(life_mean)) - 1
# Use the indices and the numbers to adjust life_mean
life_mean = numbers[indices]
# Calculate the lifetime range in each column using the same method
# as mseg_techdata.py (note that this quantity is not related to
# any statistical definitions of range) and account for any cases
# where life_mean was just adjusted to be non-zero in some years
life_range = np.fmax(np.amax(life, 0), life_mean) - life_mean
# Build complete structured dict with 'average' and 'range' data
# converted into dicts that are indexed by year
final_dict = {'average': dict(zip(map(str, years), life_mean)),
'range': dict(zip(map(str, years), life_range))}
return final_dict
def tech_names_extractor(tech_array):
"""Creates a list of unique technology "names" for a microsegment.
Text strings are used to identify which cost, performance, and
lifetime data are associated with what technologies. This function
identifies appropriate text strings to use for each of the
technologies in a single microsegment.
Args:
tech_array (numpy.ndarray): EIA technology characteristics
data available for a single microsegment, including cost,
performance, and lifetime data for (typically multiple)
performance scenarios for each technology applicable to
that microsegment.
Returns:
A list of strings, where each string represents a technology
"name" or descriptor that does not include scenario-specific
details like "2020 high" or "2009 installed base".
"""
# Create empty list to be populated with technology names
technames = []
for row in tech_array:
# Identify the technology name from the 'technology name' column
# in the data using a regex set up to match any text '.+?' that
# appears before the first occurrence of a space followed by a
# 2 and three other numbers (e.g., 2009 or 2035)
tech_name = re.search(r'.+?(?=\s2[0-9]{3})', row['technology name'])
# If the regex matched, check the matching text to see if it
# corresponds to a linear fluorescent lighting technology
# represented in the format 'T# F##', e.g., 'T8 F96'; if it does,
# extract from the match just the 'T# F##' string without any
# additional modifier text (e.g., 'T8 F96 High Output'); if not,
# add the text that matched originally, which describes the
# technology without scenario-specific text like '2003 installed
# base' to the technames list
if tech_name:
lfl_tech_name = re.search('^(T[0-9] F[0-9]{2})',
tech_name.group(0))
if lfl_tech_name:
technames.append(lfl_tech_name.group(0))
else:
technames.append(tech_name.group(0))
# Else, if the technology name is not from a placeholder row,
# add the entire name text to the technames list
else:
if not re.search('placeholder', row['technology name']):
technames.append(row['technology name'])
# Reduce the list to only the unique entries
technames = list(np.unique(technames))
return technames
def cost_conversion_factor(sf_data, sd_data, sel, years):
"""Obtain factors to change cost data from service demand to sq ft basis.
Equipment capital costs provided in the AEO data have the units
dollars per unit service demand basis. These data must be converted
to a per square foot floor area basis to provide a usable baseline
for measure competition. This function calculates the conversion
factors that can be used to transform the units of the technology
cost data for each year for a given microsegment.
Args:
sf_data (numpy.ndarray): Imported EIA data including square
footage data as a function of census division and
building type. (Includes the full data file contents.)
sd_data (numpy.ndarray): Imported EIA service demand data specified
over the same efficiency levels for each technology.
tpp_data (numpy.ndarray): A numpy structured array of the
EIA commercial market time preference premium data.
sel (list): A list of integers indicating the microsegment.
years (list): A list of integers representing the range of years
in the data, precalculated for speed.
Returns:
A numpy array of scaling factors corresponding to the specified
microsegment for converting the technology/product baseline
costs from a per unit service demand to a per square foot
basis, specified for each year.
"""
# Extract the square footage data for existing and new buildings
# for the microsegment identified by 'sel'
sqft_surv = cm.catg_data_selector(sf_data, sel, 'SurvFloorTotal', years)
sqft_new = cm.catg_data_selector(sf_data, sel, 'CMNewFloorSpace', years)
# Calculate the total square footage for each year by joining
# the square footage on a common year vector and then summing
# the two columns; delete the extraneous column and rename the
# remaining column to represent the data recorded
sqft = recfn.join_by(
'Year', sqft_surv, sqft_new, jointype='outer', usemask=False)
sqft['Amount1'] = sqft['Amount1'] + sqft['Amount2']
sqft = sqft[['Year', 'Amount1']]
sqft.dtype.names = 'Year', 'Total'
# Include only the square footage data that corresponds to the
# years included in the 'years' list
sqft = [row['Total'] for row in sqft if int(row['Year']) in years]
# Extract the service demand data applicable to the specified
# census division, building type, and end use
sd_cut = sd_data[np.all([sd_data['r'] == sel[0],
sd_data['b'] == sel[1],
sd_data['s'] == sel[2]], axis=0)]
# In the service demand data, each year of data is represented by
# a separate column; obtain the total service demand for each year
# in the list of years provided by summing the columns of the
# reduced service demand array
sd = np.sum(
sd_cut[list(map(str, years))].view(('<f8', len(years))), axis=0)
# For end uses other than lighting and ventilation, service demand
# is given as 1e12 BTU, which requires dividing by 1e3 to get the
# conversion factor such that it will yield $/ft^2 floor when
# multiplied by the baseline costs as coded in the data files
if sel[2] in [1, 2, 3, 5, 7]:
conv_factors = sd/sqft/1000
else:
conv_factors = sd/sqft
return conv_factors
def tpp_handler(tpp_data, sel, years):
"""Extracts and restructures time preference premium data for an end use.
Time preference premium data are specific to a year and end use and
are available for the main end uses in commercial buildings:
heating, cooling, water heating, ventilation, cooking, lighting,
and refrigeration. The data are given as the fraction of the total
population of building owners/customers with a particular time
preference (related to their discount rate) and the particular time
preferences for each subset of the population. These two parameters
are recorded as the 'population fraction' and 'time preference',
respectively.
Args:
tpp_data(numpy.ndarray): A numpy structured array of the time
preference data.
sel (list): A list of integers indicating the microsegment.
years (list): A list of integers representing the range of years
in the data, precalculated for speed.
Returns:
A dict with years as keys and lists of numbers as values for
both the population fractions and corresponding time preferences
for each year of data indicated in 'years'. These dicts are
rolled up to a master dict with keys for each of the types of
data.
"""
# From the number for the end use given in 'sel', do reverse
# lookups in the respective translation dicts to obtain the
# string that should be used to select time preference data
# NOTE - reverse lookups on dicts is not typical and can
# be unstable because unique keys can have identical values,
# though for the particular dicts used here, there should not
# be a problem
end_use_num = sel[2]
end_use_dict_loc = list(
cm.CommercialTranslationDicts().endusedict.values()).index(end_use_num)
end_use_json_str = list(
cm.CommercialTranslationDicts().endusedict.keys())[end_use_dict_loc]
end_use_kprem_string = UsefulDicts().kprem_endusedict[end_use_json_str]
# Obtain the time preference data associated with the end use
# extracted from the dict lookup
tpp_subset = tpp_data[tpp_data['End Use'] == end_use_kprem_string]
# Initialize dicts for the population fraction/proportion data and
# corresponding time preferences
proportion_dict = {}
time_prefs_dict = {}
# For each year in the data, extract the applicable population
# fractions and time preferences and, if values are reported for both
# parameters for that year, add them to their respective dicts using
# the current year as the key; time preference premiums generally
# do not vary by year, but they are included by year for completeness
for yr in years:
population_frac = tpp_subset[tpp_subset['Year'] == yr]['Proportion']
premiums = tpp_subset[tpp_subset['Year'] == yr]['Time Pref Premium']
# If any data are found/present, add to dict
if population_frac.any() and premiums.any():
proportion_dict[str(yr)] = list(population_frac)
time_prefs_dict[str(yr)] = list(premiums)
# Combine into one the separate dicts for the parameters of interest
combined_dict = {'time preference': time_prefs_dict,
'population fraction': proportion_dict}
return combined_dict
def mseg_technology_handler(tech_data, sd_data, tpp_data, sf_data, sel, years):
"""Restructures cost, performance, lifetime, and time preference data.
Using external functions that process and reformat specific
categories of data from the EIA source data arrays, this function
converts the cost, performance, and lifetime data for each
technology within a particular microsegment to a dict format that
is consistent with the residential technology data. Those data for
each technology are then added to a master dict that ultimately
includes all of the technologies in the microsegment. In addition,
the time preference premium data associated with each end use and
year are added to the master dict for the microsegment.
This function is called for each terminal, or leaf, node in the
microsegments JSON database that governs the structure of the major
project input files that are based on EIA Annual Energy Outlook
data. Each of those leaf nodes corresponds to a single, unique
microsegment. The dict returned by this function to be placed at
the leaf node includes the data for all of the technologies
applicable to that microsegment.
This function is relevant to all microsegments with a numeric end
use code <= 7 (i.e., all end uses except for PCs, non-PC office
electronics, and "other").
Args:
tech_data (numpy.ndarray): Imported EIA technology characteristics
data, with multiple efficiency levels for each technology,
including technology cost, performance, and service lifetime.
sd_data (numpy.ndarray): Imported EIA service demand data specified
over the same efficiency levels for each technology.
tpp_data (numpy.ndarray): A numpy structured array of the
EIA commercial market time preference premium data.
sf_data (numpy.ndarray): Imported EIA data including square
footage data as a function of census division and
building type. (Includes the full data file contents.)
sel (list): A list of integers indicating the microsegment.
years (list): A list of integers representing the range of years
in the data, precalculated for speed.
Returns:
A dict that specifies the cost, performance, and lifetime on
a technology-specific basis for all of the technologies in the
microsegment indicated by the 'sel' argument. Also a list of
the technology names in the microsegment that did not match
between the cost, performance, and lifetime data and the
service demand data.
"""
# Instantiate a master dict for this microsegment
complete_mseg_tech_data = {}
# From the imported EIA data, extract the technology and service
# demand data for the microsegment identified by 'sel'
filtered_tech_data = tech_data_selector(tech_data, sel)
(filtered_sd_data, sd_names_list) = sd_data_selector(sd_data, sel, years)
# Use the 'units_id' function to extract the performance units for
# the microsegment specified by 'sel' (the same function can also
# provide units for costs if they have not yet been converted to
# a per square foot floor area basis)
the_performance_units = units_id(sel, 'performance')
# Obtain the cost conversion factors (by year) for this microsegment
conv_factors = cost_conversion_factor(sf_data, sd_data, sel, years)
# Identify the names (as strings) of all of the technologies
# included in this microsegment
tech_names_list = tech_names_extractor(filtered_tech_data)
# Preallocate a list of non-matching technology names for this microsegment
mseg_non_matching_names = []
# Extract the cost, performance, and lifetime data for each
# technology in this microsegment, insert those data into a dict
# with the correct structure, and append that dict to the master
# dict for this microsegment
for tech in tech_names_list:
# Extract the cost, performance, and lifetime data specific
# to a single technology, given by 'tech'
single_tech_data = single_tech_selector(filtered_tech_data, tech)
# Extract the cost data in a dict format with 'typical' and
# 'best' cost cases
the_cost, cost_non_matching_names = cost_perf_extractor(
single_tech_data,
filtered_sd_data,
sd_names_list,
years, 'cost')
# Update the cost data with the conversion factor from $/SD
# (where SD = service demand) to $/ft^2 for both the 'typical'
# and 'best' cases, then add the units and data source to
# complete the dict for this technology
the_cost['typical'] = dict(zip(
sorted(the_cost['typical'].keys()),
sorted(the_cost['typical'].values())*conv_factors))
the_cost['best'] = dict(zip(
sorted(the_cost['best'].keys()),
sorted(the_cost['best'].values())*conv_factors))
the_cost['units'] = '2013$/ft^2 floor'
the_cost['source'] = 'EIA AEO'
# Extract the performance data, restructure into the appropriate
# dict format, and append the units and data source
the_perf, _ = cost_perf_extractor(
single_tech_data,
filtered_sd_data,
sd_names_list,
years, 'performance')
the_perf['units'] = the_performance_units
the_perf['source'] = 'EIA AEO'
# Extract the lifetime data, restructure into the appropriate
# dict format, and append the units and data source
the_life = life_extractor(single_tech_data, years)
the_life['units'] = 'years'
the_life['source'] = 'EIA AEO'
# Following the format used for the residential data, combine
# the cost, performance, and lifetime data for the technology
# identified by the variable 'tech' into a single dict
tech_data_dict = {'installed cost': the_cost,
'performance': the_perf,
'lifetime': the_life}
# Add the data for this technology to the master dict for the
# entire microsegment (limit the technology name length to
# less than 43 characters to match the stock and energy data)
complete_mseg_tech_data[tech[:43]] = tech_data_dict
# If there were any non-matching names identified, replace the
# preallocated empty list with the list of non-matching names;
# note that only the non-matching names from the cost case are
# included here since the list of names will be the same for
# either the cost or performance data extraction
if cost_non_matching_names:
mseg_non_matching_names = cost_non_matching_names
# Add time preference premium data for the current end use
# to the complete dict with all of the technology cost,
# performance, and lifetime data added
complete_mseg_tech_data['consumer choice'] = tpp_handler(
tpp_data, sel, years)
return complete_mseg_tech_data, mseg_non_matching_names
def walk(tech_data, serv_data, tpp_data, db_data, years, json_db,
key_list=[], no_match_names=[]):
"""Recursively explore the JSON structure and add the appropriate data.
Note that this walk function and the data processing function
('mseg_techology_handler') are set up slightly differently than in
com_mseg. Here, the json_interpreter function is called before the
data processing function and the numeric indices are passed to the
function, rather than sending the list of keys from the JSON to
that function and calling json_interpreter within the function.
Args:
tech_data (numpy.ndarray): A numpy structured array of the
EIA technology data, including the cost, performance,
and lifetime of individual technologies.
serv_data (numpy.ndarray): A numpy structured array of the
EIA service demand data.
tpp_data (numpy.ndarray): A numpy structured array of the
EIA commercial market time preference premium data.
db_data (numpy.ndarray): An array of commercial building data,
including total energy use by end use/fuel type and all
MELs types, new and surviving square footage, and other
parameters. Square footage data are specified as a
function of census division and building type.
years (list): A list of the years (YYYY) of data to be converted.
json_db (dict): The nested dict structure of the empty or
partially complete database to be populated with new data.
key_list (list): The list of keys that define the current
location in the database structure.
no_match_names (list): A list of names of technologies found in
the cost, performance, and lifetime data, but not in the
service demand data.
Returns:
A complete and populated dict structure for the JSON database,
and a list of all technology names that did not find a match.
"""
# Explore data structure from current level
for key, item in json_db.items():
# If there are additional levels in the dict, call the function
# again to advance another level deeper into the data structure
if isinstance(item, dict):
walk(tech_data, serv_data, tpp_data, db_data,
years, item, key_list + [key])
# If a leaf node has been reached, check if the second entry in
# the key list is one of the recognized building types and that
# there are more than two total keys present (to exclude square
# footage leaf nodes), and if so, finish constructing the key
# list for the current location and obtain the data to update
# the dict
else:
cd = cm.CommercialTranslationDicts() # Shortens if statement below
if key_list[1] in cd.bldgtypedict.keys() and len(key_list) > 2:
leaf_node_keys = key_list + [key]
# Convert keys into integers that define the microsegment
mseg_codes = cm.json_interpreter(leaf_node_keys)
# Skip all demand microsegments and end uses coded > 7
if 'demand' not in leaf_node_keys and mseg_codes[2] <= 7:
# Extract data from original data sources
data_dict, non_matching_names = mseg_technology_handler(
tech_data, serv_data, tpp_data, db_data,
mseg_codes, years)
# Set dict key to extracted data
json_db[key] = data_dict
# If non-matching names are identified, add them to
# the existing list of non-matched technology names
if non_matching_names:
no_match_names.extend(non_matching_names)
return json_db, no_match_names
def kprem_import(data_file_path, dtype_list, hl):
"""Import data and convert to a numpy structured array.
Read the contents of the time preference premium data file and
convert them into a numpy structured array. This function is
unique from the function used to import other EIA data because
the formatting of the time preference premium data (kprem) is
different. In particular, not all of the lines in the data are
the same length and there are empty lines separating the data
visually that are not needed when imported.
Args:
data_file_path (str): The full path to the data file to be imported.
dtype_list (list): A list of tuples with each tuple containing two
entries, a column heading string, and a string defining the
data type for that column. Formatted as a numpy dtype list.
hl (int): The number of header lines to skip from the top of
the file before reading data.
Returns:
A numpy structured array of the imported data file with the
columns specified by dtype_list.
"""
# Open the target CSV formatted data file
with open(data_file_path) as thefile:
# Open the file contents as a csv reader object
filecont = csv.reader(thefile, delimiter='\t')
# Create list to be populated with tuples for each row of data
# from the data file
data = []
# Skip the specified number of header lines in the file
for i in range(0, hl):
next(filecont)
# Record data type length for later repeated reference
dtypelen = len(dtype_list)
# Import the data, reconstructing the line if it is missing data
for row in filecont:
rowlen = len(tuple(row)) # Record current row length
# If the current row and the data type lengths match,