过滤
注意
下面的大多数一行程序假设w3
是一个 web3.Web3
实例;例如,可通过以下方式获得:
from web3.auto import w3
web3.eth.Eth.filter()
方法可用于设置过滤器:
-
待处理交易:
web3.eth.filter('pending')
-
新块
web3.eth.filter('latest')
-
事件日志
is manually constructed through the contract instance API:
py event_filter = mycontract.events.myEvent.createFilter(fromBlock='latest', argument_filters={'arg1':10})
or by providing the valid filter parameter of
py event_filter = w3.eth.filter({"address": contract_address})
-
附加到现有过滤器
py existing_filter = w3.eth.filter(filter_id="0x0")
T6】
注意
创建事件过滤器要求您的 Ethereum 节点为过滤器启用了 API 支持。注意,Infura 对过滤器的支持不提供对待定过滤器的访问。要获取其他无状态节点上的事件日志,请参见 web3.contract.ContractEvents
。
过滤器类别
*class* web3.utils.filters.Filter(*web3*, *filter_id*)
Filter.filter_id
创建此过滤器时由eth_newFilter
RPC 方法返回的此过滤器的filter_id
。
Filter.get_new_entries()
检索此过滤器的新条目。
将使用 web3.eth.Eth.get_filter_changes()
检索日志,它仅返回自上次轮询以来的新条目。
Filter.get_all_entries()
检索此过滤器的所有条目。
将使用 web3.eth.Eth.get_filter_logs()
检索日志,它返回所有匹配给定过滤器的条目。
Filter.format_entry(*entry*)
钩子,用于子类修改过滤器返回的日志条目的格式,或者传递给它的回调函数。
默认情况下,这将返回未修改的entry
参数。
Filter.is_valid_entry(*entry*)
子类增加额外的程序过滤的钩子。默认实现总是返回True
。
块和事务过滤器类
*class* web3.utils.filters.BlockFilter(*...*)
BlockFilter
是 Filter
的子类。
您可以使用web3.eth.filter('latest')
为新块设置一个过滤器,它将返回一个新的 BlockFilter
对象。
py new_block_filter = w3.eth.filter('latest') new_block_filter.get_new_entries()
Note that
eth_newBlockFilter
does not yet support"safe"
and"finalized"
block identifiers.
*class* web3.utils.filters.TransactionFilter(*...*)
TransactionFilter
是 Filter
的子类。
您可以使用web3.eth.filter('pending')
为新块设置一个过滤器,它将返回一个新的 TransactionFilter
对象。
py new_transaction_filter = w3.eth.filter('pending') new_transaction_filter.get_new_entries()
T6】
事件日志过滤器
您可以使用 web3.py 合约 api: web3.contract.Contract.events.your_event_name.createFilter()
为事件日志设置过滤器,这为创建事件日志过滤器提供了一些便利。参考下面的例子:
py event_filter = myContract.events.<event_name>.createFilter(fromBlock="latest", argument_filters={'arg1':10}) event_filter.get_new_entries()
T6】
更多信息参见 web3.contract.Contract.events.your_event_name.createFilter()
文档。
您可以通过提供一个包含标准过滤器参数的字典来设置一个事件日志过滤器,就像上面的web3.eth.filter
一样。假设arg1
被索引,等效的过滤器创建将看起来像:
py event_signature_hash = web3.keccak(text="eventName(uint32)").hex() event_filter = web3.eth.filter({ "address": myContract_address, "topics": [event_signature_hash, "0x000000000000000000000000000000000000000000000000000000000000000a"], })
T6】
topics
参数是顺序相关的。对于非匿名事件,主题列表中的第一项总是事件签名的 keccack 散列。后续主题项目是索引事件参数的十六进制编码值。在上面的例子中,第二项是编码成十六进制字符串表示的arg1
值10
。
除了依赖于顺序之外,在指定主题过滤器时,还有几点需要注意:
给定一个包含主题[甲,乙]的事务日志,以下主题过滤器将产生一个匹配:
- [] "Anything"
- [a] "A comes first (and anything after that)"
- [None, B] "Anything comes first, B comes second (and anything after that)"
- [A, B] "A is in the first place and B is in the second place (and anything after that)"
- [[A, B], [A, B]] "(A or B) in the first place, (A or B) in the second place (and anything after that) "
有关标准过滤器参数的更多信息,请参见 JSON-RPC 文档。
Note
Although the block identifiers
"latest"
and"safe"
are not part of theeth_newFilter
specification, they are supported by web3.py, and may or may not produce the expected results depending on the accessed node.
通过以上任一方法创建日志过滤器都将返回一个 LogFilter
实例。
*class* web3.utils.filters.LogFilter(*web3*, *filter_id*, *log_entry_formatter=None*, *data_filter_set=None*)
LogFilter
类是 Filter
的子类。参见 Filter
文档了解继承的方法。
LogFilter
提供了以下附加方法:
LogFilter.set_data_filters(*data_filter_set*)
提供了一种过滤日志数据的方法,换句话说,就是能够过滤来自未编制索引的事件参数的值。参数data_filter_set
应该是 32 字节十六进制编码值的列表或集合。
获取事件而不设置过滤器
您可以查询以太坊节点以直接获取事件,而无需先创建过滤器。这适用于所有节点类型。
示例见web3.contract.ContractEvents.getLogs()
。
示例:监听事件
同步的
```py from web3.auto import w3 import time
def handle_event(event): print(event)
def log_loop(event_filter, poll_interval): while True: for event in event_filter.get_new_entries(): handle_event(event) time.sleep(poll_interval)
def main(): block_filter = w3.eth.filter('latest') log_loop(block_filter, 2)
if name == 'main': main() ```
T6】
### 异步筛选器轮询
从 web3 版本 4 开始,watch
方法被从 web3 过滤器对象中移除。在设计一个关于线程和并发的系统时,需要做出许多决定。web3 没有强迫用户做出决定,而是让用户自己选择。下面是异步过滤器事件处理的一些示例实现,可以作为起点。
使用async
和await
的单线程并发
从 python 3.5 开始,添加了async
和await
内置关键字。这些为协程提供了一个共享的 api,可以被内置的 asyncio 等模块使用。下面是一个使用 asyncio 的示例事件循环,它轮询多个 web3 过滤器对象,并将新条目传递给一个处理程序。
```py from web3.auto import w3 import asyncio
def handle_event(event): print(event) # and whatever
async def log_loop(event_filter, poll_interval): while True: for event in event_filter.get_new_entries(): handle_event(event) await asyncio.sleep(poll_interval)
def main(): block_filter = w3.eth.filter('latest') tx_filter = w3.eth.filter('pending') loop = asyncio.get_event_loop() try: loop.run_until_complete( asyncio.gather( log_loop(block_filter, 2), log_loop(tx_filter, 2))) finally: loop.close()
if name == 'main': main() ```
Read the document asyncio for more information.
在单独的线程中运行事件循环
这是上述示例的扩展版本,其中事件循环在一个单独的线程中运行,为其他任务释放main
函数。
```py from web3.auto import w3 from threading import Thread import time
def handle_event(event): print(event) # and whatever
def log_loop(event_filter, poll_interval): while True: for event in event_filter.get_new_entries(): handle_event(event) time.sleep(poll_interval)
def main(): block_filter = w3.eth.filter('latest') worker = Thread(target=log_loop, args=(block_filter, 5), daemon=True) worker.start() # .. do some other stuff
if name == 'main': main() ```
T6】
以下是一些为编写异步 python 提供框架的其他库: