Prado EFO PVA
rule_fcst_par_conc_futrs_notdone.py
Go to the documentation of this file.
1 # -*- coding: utf-8 -*-
2 """
3 Created on Mon May 18 19:48:52 2020
4 
5 @author: cd
6 """
7 
8 
9 import abc
10 import numpy as np
11 from efo.rule import RuleBase, RuleRlsSpecified
12 from efo.junction import ReservoirJunction
13 # import multiprocessing as mp
14 # from multiprocessing import Pool as p
15 import concurrent.futures
16 # from collections import deque
17 
18 # TODO: Numpy array variable should be passed as reference or pointer
19 # TODO: Function arguments should be labeled "ref" or "ptr" at beginning for clarification
20 # TODO: Function argments that are array copies should be labeled "cpy"
21 
22 
23 class RuleFcstBase(RuleBase):
24  def __init__(self, name, timeFcst):
25  # Call super class constructor
26  super().__init__(name, timeFcst)
27  # self.T = timeFcst
28  self.releaserelease = np.full(self.T.Tcont.nSteps, np.nan)
29  self.releaseSchedreleaseSched = np.empty(self.T.nSteps)
30 
31  def calc_release(self, rlsProposed, rlsUnCtrl, stor, rlsPrev=None, qIn=None):
32  rlsProposed = super().calc_release(
33  rlsProposed, rlsPrev=rlsPrev, rlsUnCtrl=rlsUnCtrl, stor=stor, qIn=qIn)
34  # TODO: Check to see if the current date is a forecast date
35  self.releaseSchedreleaseSched = self.calc_release_fcstcalc_release_fcst(rlsProposed, rlsUnCtrl, stor, qIn)
36  return self.releaseSchedreleaseSched[1]
37 
38  @abc.abstractmethod
39  def calc_release_fcst(self, rlsProposed, rlsUnCtrl, stor, qIn):
40  # TODO: This should return a tuple that has the release for the current time step
41  # but also the release schedule and possilby the max and min?
42  # TODO: In the world of OOP this should be a protected method because we only
43  # want to call this from other forecast rules, but in python not sure.
44  pass
45 
46  @classmethod
47  def __subclasshook__(cls, C):
48  if cls is RuleFcstBase:
49  attrs = set(dir(C))
50  if set(cls.__abstractmethods__) <= attrs:
51  return True
52  return NotImplemented
53 
54 
56  def __init__(self, name, timeFcst, efoResrvr, storTh, riskTol, constants, *,
57  efoNetwork=None, ruleRlsSpecified=None, efoRlsSched=None):
58  # Call super class constructor
59  super().__init__(name, timeFcst)
60  # Set sub class properties
61  self.efoNetworkefoNetwork = efoNetwork
62  self.efoResrvrefoResrvr = efoResrvr
63  self.ruleRlsSpecifiedruleRlsSpecified = ruleRlsSpecified
64  self.efoRlsSchedefoRlsSched = efoRlsSched
65  self.nMembersnMembers = len(efoResrvr)
66  # TODO: This shoud be a storage guide curve rule for flexibility
67  self.storThstorTh = float(storTh)
68  self.riskTolriskTol = riskTol
69  self.constantsconstants = constants
70  self.concTaskerconcTasker = ConcurrentTasker()
71 
72  def calc_release_fcst(self, rlsProposed, rlsUnCtrl, stor, qIn=None):
73  # Set initial conditions of the ensemble networks to the current observed
74  # TODO: Need to create efo network that has the primary network and the ensembles
75  # TODO: Should this have max and min release properties for easier access for results?
76  if self.efoNetworkefoNetwork is not None:
77  # TODO: Here it should just return a copy with the data for today's forecast
78  # TODO: This will avoid pickling all of the data in parallel processing
79  # TODO: May need to add a copy method to allow this
80  for net in self.efoNetworkefoNetwork: net.set_init_cond()
81  # Initialize variables
82  t = self.T.get_fcst_time()
83  # self.storFcst = np.empty(t.nSteps, self.nMembers, t.nSteps - 1)
84  # self.storFcst[:] = np.nan
85  self.storFcststorFcst = np.full((t.nSteps, self.nMembersnMembers, t.nSteps-1), np.nan)
86  # self.storFcst[0, :, :] = [curRes.stor[0] for curRes in self.efoResrvr]
87  self.storFcststorFcst[0, :, :] = self.efoResrvrefoResrvr[0].stor[0]
88  # self.rlsFcst = np.full((t.nSteps, self.nMembers, t.nSteps - 1), np.nan)
89  self.rlsUnCtrlFcstrlsUnCtrlFcst = np.full((t.nSteps, self.nMembersnMembers, t.nSteps-1), np.nan)
90  self.rlsUnCtrlFcstrlsUnCtrlFcst[0, :, :] = self.efoResrvrefoResrvr[0].rlsUnCtrl[0]
91  self.rlsFcstrlsFcst = np.zeros((t.nSteps, self.nMembersnMembers, t.nSteps-1))
92  for i in range(t.nSteps-1):
93  self.rlsFcstrlsFcst[i+2:, 0:self.nMembersnMembers, i] = np.nan
94  # self.rlsFcst[0, :, :] = [curRes.rlsCtrl[0] for curRes in self.efoResrvr]
95  # TODO: Maybe use the passed rlsPrev here?
96  self.rlsFcstrlsFcst[0, :, :] = self.efoResrvrefoResrvr[0].rlsCtrl[0]
97  self.rlsMaxrlsMax = np.full((t.nSteps, self.nMembersnMembers, t.nSteps-1), np.nan)
98  self.rlsNoConstraintrlsNoConstraint = np.copy(self.rlsFcstrlsFcst)
99  self.prExcThprExcTh = np.full(t.nSteps, np.nan)
100  self.prExcThPreRlsprExcThPreRls = np.copy(self.prExcThprExcTh)
101  self.iRiskyMbrsiRiskyMbrs = np.full((t.nSteps, self.nMembersnMembers), False)
102  riskySelect = np.arange(0, self.nMembersnMembers, dtype=int)
103  riskyMbr = np.copy(riskySelect)
104  iRisky = np.full(self.nMembersnMembers, False)
105  iRiskyChk = np.full(self.nMembersnMembers, True)
106  volAbvThresh = np.zeros([t.nSteps, self.nMembersnMembers])
107  vol2Rls = np.zeros([t.nSteps, self.nMembersnMembers])
108  rlsFcst = np.full(t.nSteps, np.nan)
109  rlsFcst[0] = self.efoResrvrefoResrvr[0].rlsCtrl[0]
110  rlsMax = np.full(t.nSteps, np.nan)
111  rlsToday = np.empty((t.nSteps-1, self.nMembersnMembers))
112  rlsSched = np.zeros(t.nSteps)
113  for tsHoriz in t.steps[1:]:
114  self.T.step = tsHoriz
115  if tsHoriz > 1:
116  self.storFcststorFcst[:, :, tsHoriz-1] = self.storFcststorFcst[:, :, tsHoriz-2]
117  self.rlsUnCtrlFcstrlsUnCtrlFcst[:, :, tsHoriz-1] = self.rlsUnCtrlFcstrlsUnCtrlFcst[:, :, tsHoriz-2]
118  future2ensMbr = dict()
119  with concurrent.futures.ProcessPoolExecutor(max_workers=6) as executor:
120  for j in range(self.nMembersnMembers):
121  # TODO: Maybe shouldn't set the override to 0 here?
122  if issubclass(type(self.ruleRlsSpecifiedruleRlsSpecified[riskySelect[j]]), RuleRlsSpecified):
123  self.ruleRlsSpecifiedruleRlsSpecified[riskySelect[j]].set_release(tsHoriz, 0)
124  # self.futures.append(executor.submit(self.efoResrvr[j].calc_qout()))
125  # futures.append(executor.submit(self.efoResrvr[j].calc_qout()))
126  future2ensMbr.update( {executor.submit(self.efoResrvrefoResrvr[j].calc_qout()): j} )
127  # self.efoResrvr[j].calc_qout()
128  # TODO: In a network this should just process up to the efoReseroir...
129  # TODO: and not points downstream to save compute time
130  for curFuture in concurrent.futures.as_completed(future2ensMbr):
131  curMbr = future2ensMbr[curFuture]
132  self.storFcststorFcst[tsHoriz, riskySelect[curMbr], tsHoriz-1] = self.efoResrvrefoResrvr[curMbr].stor[tsHoriz]
133  self.rlsUnCtrlFcstrlsUnCtrlFcst[tsHoriz, riskySelect[curMbr], tsHoriz-1] = \
134  self.efoResrvrefoResrvr[curMbr].rlsUnCtrl[tsHoriz]
135  # while futures:
136  # future = futures.popleft()
137  # if future.exception():
138  # continue
139  # elif future.done():
140  # self.storFcst[tsHoriz, riskySelect[j], tsHoriz-1] = self.efoResrvr[j].stor[tsHoriz]
141  # self.rlsUnCtrlFcst[tsHoriz, riskySelect[j], tsHoriz-1] = \
142  # self.efoResrvr[j].rlsUnCtrl[tsHoriz]
143  risky = True
144  while risky:
145  storPlusUnCtrl = self.storFcststorFcst[tsHoriz, :, tsHoriz-1]\
146  + self.rlsUnCtrlFcstrlsUnCtrlFcst[tsHoriz, :, tsHoriz-1]*self.constantsconstants.cfs2af
147  iRisky = (storPlusUnCtrl > self.storThstorTh) & iRiskyChk
148  self.prExcThprExcTh[tsHoriz] = np.sum(iRisky)/self.nMembersnMembers
149  if self.prExcThprExcTh[tsHoriz] > self.riskTolriskTol[tsHoriz]:
150  self.prExcThPreRlsprExcThPreRls[tsHoriz] = self.prExcThprExcTh[tsHoriz]
151  iRiskyChk = np.copy(iRisky)
152  # riskyMbr = np.where(self.storFcst[tsHoriz, :, tsHoriz-1] > self.storTh)[0]
153  riskyMbr = np.where(iRisky)[0]
154  nMbrs2Reduce = int(len(riskyMbr) - np.floor(self.riskTolriskTol[tsHoriz]*self.nMembersnMembers))
155  volAbvThresh[tsHoriz, riskyMbr] = \
156  self.storFcststorFcst[tsHoriz, riskyMbr, tsHoriz-1]\
157  + sum(self.rlsUnCtrlFcstrlsUnCtrlFcst[1:tsHoriz+1, riskyMbr, tsHoriz-1])*self.constantsconstants.cfs2af\
158  - self.storThstorTh
159  mbrSorted = np.argsort(volAbvThresh[tsHoriz, riskyMbr])
160  riskySelect = riskyMbr[mbrSorted[0:nMbrs2Reduce]]
161  self.iRiskyMbrsiRiskyMbrs[tsHoriz, riskySelect] = True
162  # TODO: Should add a while loop so you can account for evaportion iterations
163  # TODO: This percent increase (1.01) should be a property
164  # vol2Rls[tsHoriz, riskySelect] = \
165  # 1.01*(self.storFcst[tsHoriz, riskySelect, tsHoriz-1] - self.storTh)
166  vol2Rls[tsHoriz, riskySelect] = 1.01*volAbvThresh[tsHoriz, riskySelect]
167  tasks = []
168  for j in range(len(riskySelect)):
169  rlsFcst[1:tsHoriz+1] = np.sum(
170  vol2Rls[:, riskySelect[j]])*self.constantsconstants.af2cfs/tsHoriz
171  self.rlsNoConstraintrlsNoConstraint[1:tsHoriz+1, riskySelect[j], tsHoriz-1] = np.copy(rlsFcst[1:tsHoriz+1])
172  rlsFcst[1:tsHoriz+1], rlsMax[1:tsHoriz+1] = self._calc_release_sched_calc_release_sched(
173  rlsFcst[1:tsHoriz+1], tsHoriz, riskySelect[j])
174  self.rlsMaxrlsMax[1:tsHoriz+1, riskySelect[j], tsHoriz-1] = np.copy(rlsMax[1:tsHoriz+1])
175  if np.sum(rlsFcst[1:tsHoriz+1])*self.constantsconstants.cfs2af < np.sum(vol2Rls[:, riskySelect[j]]):
176  iRiskyChk[riskySelect[j]] = False
177  vol2Rls[tsHoriz, riskySelect[j]] = \
178  vol2Rls[tsHoriz, riskySelect[j]] - \
179  (np.sum(vol2Rls[:, riskySelect[j]]) - np.sum(rlsFcst[1:tsHoriz+1])*self.constantsconstants.cfs2af)
180  self.rlsFcstrlsFcst[1:tsHoriz+1, riskySelect[j], tsHoriz-1] = np.copy(rlsFcst[1:tsHoriz+1])
181  risky = True
182  else:
183  if np.isnan(self.prExcThPreRlsprExcThPreRls[tsHoriz]):
184  self.prExcThPreRlsprExcThPreRls[tsHoriz] = self.prExcThprExcTh[tsHoriz]
185  # TODO: Create a function that initializes these variables, would have to return...
186  # TODO: a tuple of multiple variables. Not sure this would save any code redundancy
187  riskySelect = np.arange(0, self.nMembersnMembers, dtype=int)
188  riskyMbr = np.copy(riskySelect)
189  iRiskyChk = np.full(self.nMembersnMembers, True)
190  risky = False
191  rlsToday[:, :] = self.rlsFcstrlsFcst[1, :, :].T
192  if np.any(rlsToday[:] > 0):
193  # TODO: You could have a count of the number of ens mbrs and ts that have the same day 1 release
194  # TODO: This could be used to provide multiple release schedules
195  tsMax, mbrMax = np.where(rlsToday==np.max(rlsToday))
196  # TODO: Does this need to be set since you return the release schedule and it is reset by super class?
197  self.releaserelease[self.T.Tcont.step] = self.rlsFcstrlsFcst[1, mbrMax[0], tsMax[0]]
198  rlsSched = self.rlsFcstrlsFcst[:, mbrMax[0], tsMax[0]]
199  else:
200  # TODO: Does this need to be set since you return the release schedule and it is reset by super class?
201  self.releaserelease[self.T.Tcont.step] = 0
202  return rlsSched
203 
204  def _calc_release_sched(self, rlsFcst, tsHoriz, ensMbr):
205  pctConvg = 0.01
206  rlsTot = np.nansum(rlsFcst)
207  maxItr = 100
208  curItr = 0
209  # Create reference variables to ensemble reservoir objects
210  rlsMax = self.efoResrvrefoResrvr[ensMbr].rlsMax[1:tsHoriz+1]
211  rlsApplied = self.efoResrvrefoResrvr[ensMbr].rlsCtrl[1:tsHoriz+1]
212  rlsMaxPrev = np.zeros(len(rlsMax))
213  rlsFcstInit = np.copy(rlsFcst)
214  # idxZero = 1
215  while True:
216  # Recaculate storage
217  for i in range(1, tsHoriz+1):
218  if issubclass(type(self.efoResrvrefoResrvr[ensMbr]), ReservoirJunction):
219  self.T.step = i
220  if issubclass(type(self.ruleRlsSpecifiedruleRlsSpecified[ensMbr]), RuleRlsSpecified):
221  self.ruleRlsSpecifiedruleRlsSpecified[ensMbr].set_release(i, np.copy(rlsFcst[i-1]))
222  # TODO: In a network this should just process up to the efoReseroir...
223  # TODO: and not points downstream to save compute time
224  self.efoNetworkefoNetwork[ensMbr].process_fcst_junctions()
225  self.storFcststorFcst[i, ensMbr, tsHoriz-1] = np.copy(self.efoResrvrefoResrvr[ensMbr].stor[i])
226  # TODO: If you are maintaining forecasted storage levels then all future timesteps...
227  # TODO: should be at the max release schedule
228  if rlsTot - np.sum(rlsApplied) < pctConvg*rlsTot\
229  or np.all(np.absolute(rlsMaxPrev - rlsMax) < 0.01) or curItr > maxItr:
230  break
231  # zChk = np.array((rlsFcst, self.efoResrvr[ensMbr].rlsCtrl, self.storFcst[:, ensMbr, tsHoriz-1], rlsMax)).T
232  rlsFcst = self._get_adjusted_release_eqldist_get_adjusted_release_eqldist(rlsFcstInit, rlsMax)
233  rlsMaxPrev = np.copy(rlsMax)
234  curItr += 1
235  return rlsFcst, np.copy(rlsMax)
236 
237  def _get_adjusted_release_eqldist(self, rlsFcstInit, rlsMax):
238  rlsFcst = np.copy(rlsFcstInit)
239  rlsFcstCheck = np.zeros(len(rlsFcst))
240  iVal = ~np.isnan(rlsFcst) & ~np.isnan(rlsMax)
241  iMaxed = np.full(len(rlsFcst), False)
242  iMaxed[0] = True
243  # iMaxed = np.copy(iZeroStor)
244  while any(iMaxed[1:]==False) and any(rlsFcst[iVal] - rlsFcstCheck[iVal] != 0):
245  rlsFcstCheck = np.copy(rlsFcst)
246  iGrtrMax = rlsFcst > rlsMax
247  iMaxed = iMaxed | iGrtrMax
248  rlsDiffVol = np.sum(rlsFcst[iGrtrMax] - rlsMax[iGrtrMax])
249  rlsFcst[iGrtrMax] = rlsMax[iGrtrMax]
250  rlsAdd = rlsDiffVol/max(1,np.sum(~iMaxed & iVal))
251  rlsFcst[~iMaxed] += rlsAdd
252  return rlsFcst
253 
254  def _get_adjusted_release_rollback(self, rlsFcstInit, tsHoriz, rlsMax):
255  rlsFcst = np.copy(rlsFcstInit)
256  rlsFcstCheck = np.copy(rlsFcst)
257  carryOver = 0
258  for i in range(tsHoriz, 0, -1):
259  rlsFcst[i] += carryOver
260  rlsFcstCheck[i] = rlsFcst[i]
261  if rlsFcst[i] > rlsMax[i]:
262  rlsFcst[i] = rlsMax[i]
263  # Readjust carryover
264  carryOver += (rlsFcstCheck[i] - rlsFcst[i])/(i-1) if i > 1 else 0
265  return rlsFcst
266 
267 
269  def __init__(self):
270  pass
271 
272  def _calc_release_sched(self, rlsFcst, tsHoriz, ensMbr):
273  pctConvg = 0.01
274  rlsTot = np.nansum(rlsFcst)
275  maxItr = 100
276  curItr = 0
277  # Create reference variables to ensemble reservoir objects
278  rlsMax = self.efoResrvr[ensMbr].rlsMax[1:tsHoriz+1]
279  rlsApplied = self.efoResrvr[ensMbr].rlsCtrl[1:tsHoriz+1]
280  rlsMaxPrev = np.zeros(len(rlsMax))
281  rlsFcstInit = np.copy(rlsFcst)
282  # idxZero = 1
283  while True:
284  # Recaculate storage
285  for i in range(1, tsHoriz+1):
286  if issubclass(type(self.efoResrvr[ensMbr]), ReservoirJunction):
287  self.T.step = i
288  if issubclass(type(self.ruleRlsSpecified[ensMbr]), RuleRlsSpecified):
289  self.ruleRlsSpecified[ensMbr].set_release(i, np.copy(rlsFcst[i-1]))
290  # TODO: In a network this should just process up to the efoReseroir...
291  # TODO: and not points downstream to save compute time
292  self.efoNetwork[ensMbr].process_fcst_junctions()
293  self.storFcst[i, ensMbr, tsHoriz-1] = np.copy(self.efoResrvr[ensMbr].stor[i])
294  # TODO: If you are maintaining forecasted storage levels then all future timesteps...
295  # TODO: should be at the max release schedule
296  if rlsTot - np.sum(rlsApplied) < pctConvg*rlsTot\
297  or np.all(np.absolute(rlsMaxPrev - rlsMax) < 0.01) or curItr > maxItr:
298  break
299  # zChk = np.array((rlsFcst, self.efoResrvr[ensMbr].rlsCtrl, self.storFcst[:, ensMbr, tsHoriz-1], rlsMax)).T
300  rlsFcst = self._get_adjusted_release_eqldist(rlsFcstInit, rlsMax)
301  rlsMaxPrev = np.copy(rlsMax)
302  curItr += 1
303  return rlsFcst, np.copy(rlsMax)
304 
305  def _get_adjusted_release_eqldist(self, rlsFcstInit, rlsMax):
306  rlsFcst = np.copy(rlsFcstInit)
307  rlsFcstCheck = np.zeros(len(rlsFcst))
308  iVal = ~np.isnan(rlsFcst) & ~np.isnan(rlsMax)
309  iMaxed = np.full(len(rlsFcst), False)
310  iMaxed[0] = True
311  # iMaxed = np.copy(iZeroStor)
312  while any(iMaxed[1:]==False) and any(rlsFcst[iVal] - rlsFcstCheck[iVal] != 0):
313  rlsFcstCheck = np.copy(rlsFcst)
314  iGrtrMax = rlsFcst > rlsMax
315  iMaxed = iMaxed | iGrtrMax
316  rlsDiffVol = np.sum(rlsFcst[iGrtrMax] - rlsMax[iGrtrMax])
317  rlsFcst[iGrtrMax] = rlsMax[iGrtrMax]
318  rlsAdd = rlsDiffVol/max(1,np.sum(~iMaxed & iVal))
319  rlsFcst[~iMaxed] += rlsAdd
320  return rlsFcst
321 
322 
324  def __init__(self, name, rulePfo, ruleEfoNoRls):
325  # Call super class constructor
326  super().__init__(name, rulePfo.T)
327  self.rulePforulePfo = rulePfo
328  self.ruleEfoNoRlsruleEfoNoRls = ruleEfoNoRls
329  self.riskEforiskEfo = np.full((self.T.Tcont.nSteps, self.T.nSteps), 0.)
330 
331  def calc_release_fcst(self, rlsProposed, rlsUnCtrl, stor, qIn=None):
332  zeroRls = self.ruleEfoNoRlsruleEfoNoRls.calc_release_fcst(rlsProposed, rlsUnCtrl, stor, qIn)
333  self.riskEforiskEfo[self.T.Tcont.step,1:] = self.ruleEfoNoRlsruleEfoNoRls.prExcTh[1:]
334  rlsSched = self.rulePforulePfo.calc_release_fcst(rlsProposed, rlsUnCtrl, stor, qIn)
335  # TODO: Does this need to be set since you return the release schedule and it is reset by super class?
336  self.releaserelease[self.T.Tcont.step] = rlsSched[1]
337  return rlsSched
338 
339 
340 # NOT DONE
341 # TODO: This should keep track of the mapping between the fcst reservoir and the network.
343  def __init__(self, name, timeFcst, fcstNetwork):
344  # Call super class constructor
345  super().__init__(name, timeFcst)
346  self.fcstNetworkfcstNetwork = fcstNetwork
347 
348  def calc_release_fcst(self, rlsProposed, rlsUnCtrl, stor):
349  pass
350 
351 
352 
def calc_release_fcst(self, rlsProposed, rlsUnCtrl, stor, qIn)
def calc_release(self, rlsProposed, rlsUnCtrl, stor, rlsPrev=None, qIn=None)
def _get_adjusted_release_rollback(self, rlsFcstInit, tsHoriz, rlsMax)
def __init__(self, name, timeFcst, efoResrvr, storTh, riskTol, constants, *efoNetwork=None, ruleRlsSpecified=None, efoRlsSched=None)
def calc_release_fcst(self, rlsProposed, rlsUnCtrl, stor, qIn=None)
def calc_release_fcst(self, rlsProposed, rlsUnCtrl, stor, qIn=None)