Skip to content

pyrc.IRCClient

Client class for setting up a connection to an IRC server

Attributes:

Name Type Description
host Union[None, str]

a string representing the host that the client is connected to

port Union[None, int]

the port number the client is connected to

Source code in pyrc/client.py
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
class IRCClient:
    r"""
    Client class for setting up a connection to an IRC server
    :ivar host: a string representing the host that the client is connected to
    :ivar port: the port number the client is connected to
    """

    def __init__(self, **kwargs):
        self._events: Dict[str, List[Callable]] = {}
        self._task: Union[None, asyncio.Task] = None
        self._reader: Union[None, asyncio.StreamReader] = None
        self._writer: Union[None, asyncio.StreamWriter] = None
        self._named_events = {
            [
                "PRIVMSG",
            ]: "message",
            [
                "NOTICE",
            ]: "notice",
            ["403", "471", "473", "474", "475"]: "join_fail",
        }
        self._exts = {}
        self._depmap = {}
        self.host: Union[None, str] = None
        self.port: Union[None, int] = None
        self.ctcpchar = bytes("\x01", "UTF-8")
        self.cmdbuf = []
        self.cmdwait = False
        self.chmodemap: Dict[str, List[extension.Extension]] = {}
        self.channels = set()
        override_default_events: Union[None, Callable] = kwargs.get(
            "override_default_events"
        )
        self.load_behavior: Union[Literal["lazy"], Literal["active"]] = kwargs.get(
            "loading", "lazy"
        )
        self.parser = self.parse
        if override_default_events:
            override_default_events(self)
            return
        pyrc.setup(self)

    def add_module(self, module):
        """Adds and loads a new module to the client ecosystem

        :param module: The module instance to load
        :type module: pyrc.ext.extension.Extension
        :raises ExtensionFailed: If the module could not be initialized
        """
        for cap in module.depends:
            if self._depmap.get(cap) is None:
                for cap in module.depends:
                    self._depmap.pop(cap, None)
                raise ExtensionFailed(
                    f"Module '{module.__name__}' failed to load. One or more dependencies was not met"
                )
            module.interfaces[cap] = self._exts.get(cap)
        for prov in module.provides:
            self._depmap.setdefault(prov, [])
        self._exts[module.name] = module

    async def remove_module(self, module_name: str):
        """Removes a given module from the client

        :param module_name: The module name to look for and remove
        :raises ExtensionNotFound: If the module is not found
        """
        mod = self._exts.get(module_name)
        if mod is None:
            raise ExtensionNotFound(
                f"Module '{module_name}' is either not loaded, or partially initialized"
            )
        self._exts.pop(module_name)
        for dep in mod.depends:
            await asyncio.sleep(0)
            self._depmap[dep].remove(mod)
        for prov in mod.provides:
            await asyncio.sleep(0)
            for other in self._depmap[prov]:
                await asyncio.sleep(0)
                await self.remove_module(other.name)
            self._depmap.pop(prov)
        await mod.teardown()

    async def _get_named_event(self, verb: str):
        """Gets a named event for use in event dispatching

        :param verb: The raw verb to convert
        :return: The named event matching the verb, or the verb if no named event exists
        """
        for keys, value in self._named_events.items():
            await asyncio.sleep(0)
            if verb in keys:
                return value
        return verb

    async def parse(self, message: str):
        """Parses a message, and returns the event name and Context (if the event takes a Context)

        :param message: The raw server message to parse
        :return: The event name, and the Context if applicable

        """
        params = message.split(" ", 2)
        author = IRCUser(params[0], self.chmodemap, self)
        verb = params[1]
        if verb in [
            "422",
            "376",
        ]:  # MOTD or NOMOTD verbs, this means that the connection has registered and on_ready can be emitted
            return "ready", None
        args = params[2]
        channel = None
        if args[0].startswith(":"):
            msg = args
        else:
            parse = args.split(" ")
            for part in parse:
                await asyncio.sleep(0)
                if part.startswith(":"):
                    message = parse[parse.index(part) :]
                    msg = " ".join(message)
                if part.startswith("#"):
                    channel = await self.get_channel(part)
                if verb == "PRIVMSG":
                    msg = msg.lstrip(":")
                    bmsg = bytes(msg, "UTF-8")
                    if bmsg.startswith(self.ctcpchar) and bmsg.endswith(self.ctcpchar):
                        bctcp = bmsg.strip(self.ctcpchar)
                        ctcp = bctcp.decode("UTF-8")
                        context = Context(message, author, channel, msg, ctcp)
                        return "ctcp", context
        named = await self._get_named_event(verb)
        context = Context(
            message, author, channel, msg, None, named if named != verb else None
        )
        return named, context

    async def get_channel(self, channel: str):
        """Gets the specified channel from the set of channels

        :param channel: The channel to get
        :return: The IRCChannel object, or None if it doesn't exist
        :rtype: IRCChannel | None
        """
        for chan in self.channels:
            await asyncio.sleep(0)
            if str(chan) == channel:
                return chan
        return None

    async def _dispatch_event(self, event: str, *args):
        """
        Event dispatcher for IRCClient
        :param event: Event to dispatch
        :param args: Args to pass into the callbacks
        """
        events = self._events.get("on_" + event.lower())
        if events is None:
            logging.debug(f"No events to call for event {event}")
            return
        logging.debug(f"Dispatching on_{event} to {len(events)} listeners")
        await asyncio.gather(*(callback(*args) for callback in events))

    def event(self, func, events: Union[str, List[str], None] = None):
        """
        Decorator to create a callback for events
        :param func: Coroutine to call when the event is raised
        :type func: coroutine
        :param events: Event name(s), if the coro's name is not the event name
        """
        evnt = func.__name__ if not events else events
        logging.debug(f"Registering callback for event(s) {evnt}")
        if not asyncio.iscoroutinefunction(func):
            raise TypeError(
                f'Function "{func.__name__}" is type "{type(func)}", not a Coroutine'
            )
        if isinstance(evnt, list):
            for event in evnt:
                self._events.setdefault(event, []).append(func)
            return
        self._events.setdefault(evnt, []).append(func)

    async def _loop(self):
        """
        Event loop for the client. This should not be invoked by anything other than IRCClient itself
        """
        while not self._writer.is_closing():
            try:
                data = await self._reader.readline()
                plaintext = data.decode().rstrip("\r\n")
                if (
                    plaintext == ""
                ):  # No data, the connection is probably closed, so the loop can end
                    return
                await self._dispatch_event("raw", plaintext)
                logging.debug(f"Received: {plaintext}")
                if plaintext.startswith("PING"):  # Respond to PINGs with PONGs
                    await self.send(plaintext.replace("PING", "PONG"))
                    continue
                event, ctx = await self.parser(plaintext)
                if ctx:
                    await self._dispatch_event(event, ctx)
                    continue
                await self._dispatch_event(event)
            except IndexError:
                print(f"Didn't understand {plaintext}", end="")
                continue
            except ConnectionResetError:
                logging.error("Connection lost.")
                await self._dispatch_event("disconnect")
                return
            except Exception as e:
                print(traceback.format_exception(e))
                await self.disconnect("Library error. Disconnected")

    async def send(self, message: str):
        """
        Send a raw message to an IRC server
        :param message: Raw IRC command to send
        :raises NotConnectedError: If the client is not actually connected yet
        """
        if self._writer is None:
            raise NotConnectedError(
                "This IRCClient is not yet connected, call .connect first!"
            )  # noqa: F405
        if self.cmdwait and not message.startswith("PONG"):
            logging.debug(
                f'Command "{message}" sent before connection was ready, deferring...'
            )
            self.cmdbuf.append(message)
            return
        encoded_msg = bytes(message + "\r\n", "UTF-8")
        self._writer.write(encoded_msg)
        await self._writer.drain()
        logging.debug(f"Sent: {message}")

    async def connect(self, host: str, port: int, username: str, **kwargs):
        """
        Connects to the IRC server
        :param host: Hostname or IP for the IRCd
        :param port: Port number for the IRCd
        :param username: Username to use when authenticating with IRC
        :param kwargs: Additional args to pass to the client on connect
        """
        self.host = host
        self.port = port
        use_ssl = kwargs.get("ssl")
        nickname = kwargs.get("nick")
        password = kwargs.get("password")
        logging.info(
            f"Attempting to connect to IRCd at {host}:{'+' if use_ssl else ''}{port}"
        )
        self._reader, self._writer = await asyncio.open_connection(
            self.host, self.port, ssl=use_ssl
        )
        self._task = asyncio.create_task(self._loop())
        realname = kwargs.get("realname")
        if nickname is None:
            nickname = username
        if password is not None:
            logging.debug("Attempting to authenticate with the provided password")
            await self.send(f"PASS {password}")
        await self.send(f"NICK {nickname}")
        await self.send(
            f"USER {username} 8 * :{realname if realname is not None else username}"
        )
        await self._dispatch_event("connect")
        self.cmdwait = True

    async def disconnect(self, quit_message: str = None):
        """
        Immediately signals to the IRC server that the client is disconnecting and disconnects
        :param quit_message: The message to show on the IRCd. 'Quit' if no parameter is provided
        :return:
        """
        await self.send(f"QUIT :{quit_message if quit_message else 'Quit'}")
        self._writer.close()
        await self._task
        logging.info("Disconnected and loop closed.")
        await self._dispatch_event("disconnect")

    async def join_channel(self, channel: Union[str, List[str]]):
        """
        Joins a channel or list of channels
        :param channel: A string representing a channel, or a list of channels
        :return: An IRCChannel or list of IRCChannels. One or more channels will be None if joining a channel failed
        :rtype: Iterable[IRCChannel | None]
        """
        if isinstance(channel, list):
            for chan in channel:
                try:
                    await self.send(f"JOIN {chan}")
                    res = await self.wait_for(
                        ["on_join", "on_join_fail"],
                        lambda ctx: str(ctx.channel) in [None, chan],
                    )
                    if res.event == "join_fail":
                        yield None
                    yield res.channel
                except asyncio.TimeoutError:
                    yield None
                    continue
            return
        await self.send(f"JOIN {channel}")
        try:
            res = await self.wait_for(
                ["on_join", "on_join_fail"],
                lambda ctx: str(ctx.channel) in [None, channel],
            )
            if res.event == "join_fail":
                yield None
            yield res.channel
        except asyncio.TimeoutError:
            yield None

    async def ctcpreply(self, nick: str, query: str, reply: str):
        """
        Reply to a received CTCP query from the given nick
        :param nick: The nick of the user who sent us the CTCP query
        :param query: The query they sent us, so we know what we're responding to
        :param reply: What we're responding with
        """
        await self.send(f"NOTICE {nick} \x01{query} {reply}\x01")

    async def ctcp(self, nick: str, query: str):
        """
        Send a CTCP query to the specified `nick`
        :param nick: The nickname of the user we are sending the CTCP to
        :param query: The query we are sending
        """
        await self.send(f"PRIVMSG {nick} :\x01{query}\x01")

    async def wait_for(
        self,
        event: Union[str, list],
        check: Callable = lambda args: True,
        timeout: float = 30,
    ):
        """Waits for a specified event to occur, and returns the result

        :param event: The name (or names) of the event(s) to wait for. Returns the first event that matches
        :param check: A Callable that checks the context of the event and returns a bool
        :param timeout: Will raise a TimeoutError if the time is exceeded, defaults to 30
        :raises asyncio.TimeoutError: If the event is not triggered in time
        :return: The Context of the event that passed `check`
        :rtype: pyrc.Context | False
        """
        result = False
        done = asyncio.Event()

        async def inner(context=None):
            nonlocal result
            if check(context):
                result = context
                done.set()

        self.event(inner, event)
        try:
            async with asyncio.timeout(timeout):
                await done.wait()
        finally:
            if isinstance(event, list):
                for evnt in event:
                    await asyncio.sleep(0)
                    self._events[evnt].remove(inner)
                return result
            self._events(event).remove(inner)
        return result

    async def load_extension(self, ext: str):
        """Load an extension

        :param ext: The import name of the module (e.g. cogs.myextension to import myextension from the cogs/ folder)
        :raises ExtensionFailed: If the extension couldn't be loaded
        """
        try:
            mod = importlib.import_module(ext)
            mod.setup(self, ext)
            logging.debug(f"Extension '{ext}' was successfully set up")
        except ModuleNotFoundError as e:
            raise ExtensionFailed(
                f"Extension '{ext}' could not be loaded: Module not found"
            ) from e
        except AttributeError as e:
            raise ExtensionFailed(
                f"Extension '{ext}' does not implement a 'setup' function"
            ) from e

    async def unload_extension(self, ext: str):
        """Unloads the given extension

        :param ext: The extension to unload
        """
        for name, mod in self._exts:
            if mod.importby == ext:
                await self.remove_module(name)

add_module(module)

Adds and loads a new module to the client ecosystem

Parameters:

Name Type Description Default
module pyrc.ext.extension.Extension

The module instance to load

required

Raises:

Type Description
ExtensionFailed

If the module could not be initialized

Source code in pyrc/client.py
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
def add_module(self, module):
    """Adds and loads a new module to the client ecosystem

    :param module: The module instance to load
    :type module: pyrc.ext.extension.Extension
    :raises ExtensionFailed: If the module could not be initialized
    """
    for cap in module.depends:
        if self._depmap.get(cap) is None:
            for cap in module.depends:
                self._depmap.pop(cap, None)
            raise ExtensionFailed(
                f"Module '{module.__name__}' failed to load. One or more dependencies was not met"
            )
        module.interfaces[cap] = self._exts.get(cap)
    for prov in module.provides:
        self._depmap.setdefault(prov, [])
    self._exts[module.name] = module

connect(host, port, username, **kwargs) async

Connects to the IRC server

Parameters:

Name Type Description Default
host str

Hostname or IP for the IRCd

required
port int

Port number for the IRCd

required
username str

Username to use when authenticating with IRC

required
kwargs

Additional args to pass to the client on connect

{}
Source code in pyrc/client.py
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
async def connect(self, host: str, port: int, username: str, **kwargs):
    """
    Connects to the IRC server
    :param host: Hostname or IP for the IRCd
    :param port: Port number for the IRCd
    :param username: Username to use when authenticating with IRC
    :param kwargs: Additional args to pass to the client on connect
    """
    self.host = host
    self.port = port
    use_ssl = kwargs.get("ssl")
    nickname = kwargs.get("nick")
    password = kwargs.get("password")
    logging.info(
        f"Attempting to connect to IRCd at {host}:{'+' if use_ssl else ''}{port}"
    )
    self._reader, self._writer = await asyncio.open_connection(
        self.host, self.port, ssl=use_ssl
    )
    self._task = asyncio.create_task(self._loop())
    realname = kwargs.get("realname")
    if nickname is None:
        nickname = username
    if password is not None:
        logging.debug("Attempting to authenticate with the provided password")
        await self.send(f"PASS {password}")
    await self.send(f"NICK {nickname}")
    await self.send(
        f"USER {username} 8 * :{realname if realname is not None else username}"
    )
    await self._dispatch_event("connect")
    self.cmdwait = True

ctcp(nick, query) async

Send a CTCP query to the specified nick

Parameters:

Name Type Description Default
nick str

The nickname of the user we are sending the CTCP to

required
query str

The query we are sending

required
Source code in pyrc/client.py
342
343
344
345
346
347
348
async def ctcp(self, nick: str, query: str):
    """
    Send a CTCP query to the specified `nick`
    :param nick: The nickname of the user we are sending the CTCP to
    :param query: The query we are sending
    """
    await self.send(f"PRIVMSG {nick} :\x01{query}\x01")

ctcpreply(nick, query, reply) async

Reply to a received CTCP query from the given nick

Parameters:

Name Type Description Default
nick str

The nick of the user who sent us the CTCP query

required
query str

The query they sent us, so we know what we're responding to

required
reply str

What we're responding with

required
Source code in pyrc/client.py
333
334
335
336
337
338
339
340
async def ctcpreply(self, nick: str, query: str, reply: str):
    """
    Reply to a received CTCP query from the given nick
    :param nick: The nick of the user who sent us the CTCP query
    :param query: The query they sent us, so we know what we're responding to
    :param reply: What we're responding with
    """
    await self.send(f"NOTICE {nick} \x01{query} {reply}\x01")

disconnect(quit_message=None) async

Immediately signals to the IRC server that the client is disconnecting and disconnects

Parameters:

Name Type Description Default
quit_message str

The message to show on the IRCd. 'Quit' if no parameter is provided

None

Returns:

Type Description
Source code in pyrc/client.py
287
288
289
290
291
292
293
294
295
296
297
async def disconnect(self, quit_message: str = None):
    """
    Immediately signals to the IRC server that the client is disconnecting and disconnects
    :param quit_message: The message to show on the IRCd. 'Quit' if no parameter is provided
    :return:
    """
    await self.send(f"QUIT :{quit_message if quit_message else 'Quit'}")
    self._writer.close()
    await self._task
    logging.info("Disconnected and loop closed.")
    await self._dispatch_event("disconnect")

event(func, events=None)

Decorator to create a callback for events

Parameters:

Name Type Description Default
func coroutine

Coroutine to call when the event is raised

required
events Union[str, List[str], None]

Event name(s), if the coro's name is not the event name

None
Source code in pyrc/client.py
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
def event(self, func, events: Union[str, List[str], None] = None):
    """
    Decorator to create a callback for events
    :param func: Coroutine to call when the event is raised
    :type func: coroutine
    :param events: Event name(s), if the coro's name is not the event name
    """
    evnt = func.__name__ if not events else events
    logging.debug(f"Registering callback for event(s) {evnt}")
    if not asyncio.iscoroutinefunction(func):
        raise TypeError(
            f'Function "{func.__name__}" is type "{type(func)}", not a Coroutine'
        )
    if isinstance(evnt, list):
        for event in evnt:
            self._events.setdefault(event, []).append(func)
        return
    self._events.setdefault(evnt, []).append(func)

get_channel(channel) async

Gets the specified channel from the set of channels

Parameters:

Name Type Description Default
channel str

The channel to get

required

Returns:

Type Description
IRCChannel | None

The IRCChannel object, or None if it doesn't exist

Source code in pyrc/client.py
155
156
157
158
159
160
161
162
163
164
165
166
async def get_channel(self, channel: str):
    """Gets the specified channel from the set of channels

    :param channel: The channel to get
    :return: The IRCChannel object, or None if it doesn't exist
    :rtype: IRCChannel | None
    """
    for chan in self.channels:
        await asyncio.sleep(0)
        if str(chan) == channel:
            return chan
    return None

join_channel(channel) async

Joins a channel or list of channels

Parameters:

Name Type Description Default
channel Union[str, List[str]]

A string representing a channel, or a list of channels

required

Returns:

Type Description
Iterable[IRCChannel | None]

An IRCChannel or list of IRCChannels. One or more channels will be None if joining a channel failed

Source code in pyrc/client.py
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
async def join_channel(self, channel: Union[str, List[str]]):
    """
    Joins a channel or list of channels
    :param channel: A string representing a channel, or a list of channels
    :return: An IRCChannel or list of IRCChannels. One or more channels will be None if joining a channel failed
    :rtype: Iterable[IRCChannel | None]
    """
    if isinstance(channel, list):
        for chan in channel:
            try:
                await self.send(f"JOIN {chan}")
                res = await self.wait_for(
                    ["on_join", "on_join_fail"],
                    lambda ctx: str(ctx.channel) in [None, chan],
                )
                if res.event == "join_fail":
                    yield None
                yield res.channel
            except asyncio.TimeoutError:
                yield None
                continue
        return
    await self.send(f"JOIN {channel}")
    try:
        res = await self.wait_for(
            ["on_join", "on_join_fail"],
            lambda ctx: str(ctx.channel) in [None, channel],
        )
        if res.event == "join_fail":
            yield None
        yield res.channel
    except asyncio.TimeoutError:
        yield None

load_extension(ext) async

Load an extension

Parameters:

Name Type Description Default
ext str

The import name of the module (e.g. cogs.myextension to import myextension from the cogs/ folder)

required

Raises:

Type Description
ExtensionFailed

If the extension couldn't be loaded

Source code in pyrc/client.py
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
async def load_extension(self, ext: str):
    """Load an extension

    :param ext: The import name of the module (e.g. cogs.myextension to import myextension from the cogs/ folder)
    :raises ExtensionFailed: If the extension couldn't be loaded
    """
    try:
        mod = importlib.import_module(ext)
        mod.setup(self, ext)
        logging.debug(f"Extension '{ext}' was successfully set up")
    except ModuleNotFoundError as e:
        raise ExtensionFailed(
            f"Extension '{ext}' could not be loaded: Module not found"
        ) from e
    except AttributeError as e:
        raise ExtensionFailed(
            f"Extension '{ext}' does not implement a 'setup' function"
        ) from e

parse(message) async

Parses a message, and returns the event name and Context (if the event takes a Context)

Parameters:

Name Type Description Default
message str

The raw server message to parse

required

Returns:

Type Description

The event name, and the Context if applicable

Source code in pyrc/client.py
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
async def parse(self, message: str):
    """Parses a message, and returns the event name and Context (if the event takes a Context)

    :param message: The raw server message to parse
    :return: The event name, and the Context if applicable

    """
    params = message.split(" ", 2)
    author = IRCUser(params[0], self.chmodemap, self)
    verb = params[1]
    if verb in [
        "422",
        "376",
    ]:  # MOTD or NOMOTD verbs, this means that the connection has registered and on_ready can be emitted
        return "ready", None
    args = params[2]
    channel = None
    if args[0].startswith(":"):
        msg = args
    else:
        parse = args.split(" ")
        for part in parse:
            await asyncio.sleep(0)
            if part.startswith(":"):
                message = parse[parse.index(part) :]
                msg = " ".join(message)
            if part.startswith("#"):
                channel = await self.get_channel(part)
            if verb == "PRIVMSG":
                msg = msg.lstrip(":")
                bmsg = bytes(msg, "UTF-8")
                if bmsg.startswith(self.ctcpchar) and bmsg.endswith(self.ctcpchar):
                    bctcp = bmsg.strip(self.ctcpchar)
                    ctcp = bctcp.decode("UTF-8")
                    context = Context(message, author, channel, msg, ctcp)
                    return "ctcp", context
    named = await self._get_named_event(verb)
    context = Context(
        message, author, channel, msg, None, named if named != verb else None
    )
    return named, context

remove_module(module_name) async

Removes a given module from the client

Parameters:

Name Type Description Default
module_name str

The module name to look for and remove

required

Raises:

Type Description
ExtensionNotFound

If the module is not found

Source code in pyrc/client.py
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
async def remove_module(self, module_name: str):
    """Removes a given module from the client

    :param module_name: The module name to look for and remove
    :raises ExtensionNotFound: If the module is not found
    """
    mod = self._exts.get(module_name)
    if mod is None:
        raise ExtensionNotFound(
            f"Module '{module_name}' is either not loaded, or partially initialized"
        )
    self._exts.pop(module_name)
    for dep in mod.depends:
        await asyncio.sleep(0)
        self._depmap[dep].remove(mod)
    for prov in mod.provides:
        await asyncio.sleep(0)
        for other in self._depmap[prov]:
            await asyncio.sleep(0)
            await self.remove_module(other.name)
        self._depmap.pop(prov)
    await mod.teardown()

send(message) async

Send a raw message to an IRC server

Parameters:

Name Type Description Default
message str

Raw IRC command to send

required

Raises:

Type Description
NotConnectedError

If the client is not actually connected yet

Source code in pyrc/client.py
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
async def send(self, message: str):
    """
    Send a raw message to an IRC server
    :param message: Raw IRC command to send
    :raises NotConnectedError: If the client is not actually connected yet
    """
    if self._writer is None:
        raise NotConnectedError(
            "This IRCClient is not yet connected, call .connect first!"
        )  # noqa: F405
    if self.cmdwait and not message.startswith("PONG"):
        logging.debug(
            f'Command "{message}" sent before connection was ready, deferring...'
        )
        self.cmdbuf.append(message)
        return
    encoded_msg = bytes(message + "\r\n", "UTF-8")
    self._writer.write(encoded_msg)
    await self._writer.drain()
    logging.debug(f"Sent: {message}")

unload_extension(ext) async

Unloads the given extension

Parameters:

Name Type Description Default
ext str

The extension to unload

required
Source code in pyrc/client.py
406
407
408
409
410
411
412
413
async def unload_extension(self, ext: str):
    """Unloads the given extension

    :param ext: The extension to unload
    """
    for name, mod in self._exts:
        if mod.importby == ext:
            await self.remove_module(name)

wait_for(event, check=lambda args: True, timeout=30) async

Waits for a specified event to occur, and returns the result

Parameters:

Name Type Description Default
event Union[str, list]

The name (or names) of the event(s) to wait for. Returns the first event that matches

required
check Callable

A Callable that checks the context of the event and returns a bool

lambda args: True
timeout float

Will raise a TimeoutError if the time is exceeded, defaults to 30

30

Returns:

Type Description
pyrc.Context | False

The Context of the event that passed check

Raises:

Type Description
asyncio.TimeoutError

If the event is not triggered in time

Source code in pyrc/client.py
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
async def wait_for(
    self,
    event: Union[str, list],
    check: Callable = lambda args: True,
    timeout: float = 30,
):
    """Waits for a specified event to occur, and returns the result

    :param event: The name (or names) of the event(s) to wait for. Returns the first event that matches
    :param check: A Callable that checks the context of the event and returns a bool
    :param timeout: Will raise a TimeoutError if the time is exceeded, defaults to 30
    :raises asyncio.TimeoutError: If the event is not triggered in time
    :return: The Context of the event that passed `check`
    :rtype: pyrc.Context | False
    """
    result = False
    done = asyncio.Event()

    async def inner(context=None):
        nonlocal result
        if check(context):
            result = context
            done.set()

    self.event(inner, event)
    try:
        async with asyncio.timeout(timeout):
            await done.wait()
    finally:
        if isinstance(event, list):
            for evnt in event:
                await asyncio.sleep(0)
                self._events[evnt].remove(inner)
            return result
        self._events(event).remove(inner)
    return result