Prado EFO PVA
rule_downstream.py
Go to the documentation of this file.
1 # -*- coding: utf-8 -*-
2 """
3 Created on Sat May 9 21:21:36 2020
4 
5 @author: cd
6 """
7 
8 import abc
9 import numpy as np
10 from efo.rule import RuleBase
11 from efo.time import TimeBase
12 from efo.junction import JunctionRegulated
13 from efo.qin import QinRouted
14 from efo.rule_compliance import RuleComplianceBase
15 
16 
17 class RuleDwnstrmBase(RuleComplianceBase):
18  def __init__(self, name, time, junctions):
19  self.junctionsjunctions = junctions
20  super().__init__(name, time)
21 
22  @classmethod
23  def __subclasshook__(cls, C):
24  if cls is RuleDwnstrmBase:
25  attrs = set(dir(C))
26  if set(cls.__abstractmethods__) <= attrs:
27  return True
28  return NotImplemented
29 
30 
32  # TODO: This class does not need constants so maybe not in RuleMinBase
33  def __init__(self, name, time, junctions,*, ruleType=RuleComplianceBase.MAX):
34  # Call super class constructor
35  # TODO: Error check to make sure that all are Junctions
36  super().__init__(name, time, junctions)
37  # Set class variable
38  self.ruleTyperuleType = ruleType
39  # Calc the max lag time for the junctions
40  self.maxLagmaxLag = 0
41  for curJnc in self.junctionsjunctions[1:]:
42  for curQin in curJnc.qIn:
43  if isinstance(curQin, QinRouted):
44  self.maxLagmaxLag += curQin.reachUs.lagTime
45 
46  # TODO: May not need rlsProposed here if we are just always evaluating zero release
47  def _calc_delta(self, rlsProposed):
48  qDelta = np.empty([self.maxLagmaxLag+1, len(self.junctionsjunctions[1:])])
49  for i in range(0, self.maxLagmaxLag+1):
50  self.junctionsjunctions[0].set_qout(qOutSpecified=rlsProposed, tsOffset=i)
51  for j, curJnc in enumerate(self.junctionsjunctions[1:]):
52  if isinstance(curJnc, JunctionRegulated):
53  qDelta[i, j] = curJnc.calc_delta(ruleType=self.ruleTyperuleType, tsOffset=i)
54  else:
55  qDelta[i, j] = np.nan
56  curJnc.calc_qout()
57  # TODO: Would be cool to track which junction is setting release
58  return qDelta
59 
60  def get_qcomp(self, *, rlsPrev=None, rlsUnCtrl=None, stor=None, qIn=None, tsOffset=0):
61  # super().get_qcomp(rlsPrev=rlsPrev, rlsUnCtrl=rlsUnCtrl, stor=stor, qIn=qIn, tsOffset=tsOffset)
62  qDelta = self._calc_delta_calc_delta(rlsProposed=0.) if self.ruleTyperuleType==RuleComplianceBase.MIN\
63  else -self._calc_delta_calc_delta(rlsProposed=0.)
64  maxDelta = np.nanmax(qDelta[:])
65  self.qCompCurqCompCur = max(0., maxDelta)
66  return self.qCompCurqCompCur
67 
68  # def calc_release(self, rlsProposed, *, rlsPrev=None, rlsUnCtrl=None, stor=None, qIn=None):
69  # rlsProposed = super().calc_release(
70  # rlsProposed, rlsPrev=rlsPrev, rlsUnCtrl=rlsUnCtrl, stor=stor, qIn=qIn)
71  # qComp = self.get_qcomp()
72  # return min(rlsProposed, qComp) if self.ruleType==RuleComplianceBase.MAX else max(rlsProposed, qComp)
def __init__(self, name, time, junctions, *ruleType=RuleComplianceBase.MAX)
def get_qcomp(self, *rlsPrev=None, rlsUnCtrl=None, stor=None, qIn=None, tsOffset=0)
def _calc_delta(self, rlsProposed)
def __init__(self, name, time, junctions)